You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/01 17:17:06 UTC
[2/3] ignite git commit: IGNITE-9420 Move logical recovery outside of
PME - Fixes #5067.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index fb4ec1e..ce01431 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -70,8 +70,6 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.EventType;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
@@ -82,9 +80,8 @@ import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.mem.DirectMemoryRegion;
-import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider;
-import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
@@ -96,19 +93,21 @@ import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
@@ -125,17 +124,16 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.Metas
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
-import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.CountDownFuture;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -167,11 +165,9 @@ import static java.nio.file.StandardOpenOption.READ;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
-import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
-import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize;
/**
@@ -182,6 +178,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** */
public static final String IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC = "IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC";
+ /** */
+ public static final String IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP = "IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP";
+
/** MemoryPolicyConfiguration name reserved for meta store. */
public static final String METASTORE_DATA_REGION_NAME = "metastoreMemPlc";
@@ -199,6 +198,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private final String throttlingPolicyOverride = IgniteSystemProperties.getString(
IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED);
+ /** */
+ private final boolean skipCheckpointOnNodeStop = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false);
+
/** Checkpoint lock hold count. */
private static final ThreadLocal<Integer> CHECKPOINT_LOCK_HOLD_COUNT = new ThreadLocal<Integer>() {
@Override protected Integer initialValue() {
@@ -454,6 +456,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cfg.setInitialSize(storageCfg.getSystemRegionInitialSize());
cfg.setMaxSize(storageCfg.getSystemRegionMaxSize());
cfg.setPersistenceEnabled(true);
+
return cfg;
}
@@ -496,15 +499,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
.resolveFolders()
.getLockedFileLockHolder();
- fileLockHolder = preLocked == null ?
- new FileLockHolder(storeMgr.workDir().getPath(), kernalCtx, log) : preLocked;
-
- if (log.isDebugEnabled())
- log.debug("Try to capture file lock [nodeId=" +
- cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
-
- if (!fileLockHolder.isLocked())
- fileLockHolder.tryLock(lockWaitTime);
+ acquireFileLock(preLocked);
cleanupTempCheckpointDirectory();
@@ -532,17 +527,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public void cleanupRestoredCaches() {
- if (dataRegionMap == null)
+ if (dataRegionMap.isEmpty())
return;
for (CacheGroupDescriptor grpDesc : cctx.cache().cacheGroupDescriptors().values()) {
String regionName = grpDesc.config().getDataRegionName();
- DataRegion region = dataRegionMap.get(regionName == null ? DFLT_DATA_REG_DEFAULT_NAME : regionName);
+ DataRegion region = regionName != null ? dataRegionMap.get(regionName) : dfltDataRegion;
if (region == null)
continue;
+ if (log.isInfoEnabled())
+ log.info("Page memory " + region + " for " + grpDesc + " has invalidated.");
+
int partitions = grpDesc.config().getAffinity().partitions();
if (region.pageMemory() instanceof PageMemoryEx) {
@@ -550,6 +548,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
for (int partId = 0; partId < partitions; partId++)
memEx.invalidate(grpDesc.groupId(), partId);
+
+ memEx.invalidate(grpDesc.groupId(), PageIdAllocator.INDEX_PARTITION);
}
}
@@ -579,6 +579,39 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * @param preLocked Pre-locked file lock holder.
+ */
+ private void acquireFileLock(FileLockHolder preLocked) throws IgniteCheckedException {
+ if (cctx.kernalContext().clientNode())
+ return;
+
+ fileLockHolder = preLocked == null ?
+ new FileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log) : preLocked;
+
+ if (!fileLockHolder.isLocked()) {
+ if (log.isDebugEnabled())
+ log.debug("Try to capture file lock [nodeId=" +
+ cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
+
+ fileLockHolder.tryLock(lockWaitTime);
+ }
+ }
+
+ /**
+ *
+ */
+ private void releaseFileLock() {
+ if (cctx.kernalContext().clientNode() || fileLockHolder == null)
+ return;
+
+ if (log.isDebugEnabled())
+ log.debug("Release file lock [nodeId=" +
+ cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
+
+ fileLockHolder.close();
+ }
+
+ /**
* Retreives checkpoint history form specified {@code dir}.
*
* @return List of checkpoints.
@@ -661,40 +694,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** */
private void readMetastore() throws IgniteCheckedException {
try {
- DataStorageConfiguration memCfg = cctx.kernalContext().config().getDataStorageConfiguration();
-
- DataRegionConfiguration plcCfg = createMetastoreDataRegionConfig(memCfg);
-
- File allocPath = buildAllocPath(plcCfg);
-
- DirectMemoryProvider memProvider = allocPath == null ?
- new UnsafeMemoryProvider(log) :
- new MappedFileMemoryProvider(
- log,
- allocPath);
-
- DataRegionMetricsImpl memMetrics = new DataRegionMetricsImpl(plcCfg);
-
- PageMemoryEx storePageMem = (PageMemoryEx)createPageMemory(memProvider, memCfg, plcCfg, memMetrics, false);
-
- DataRegion regCfg = new DataRegion(storePageMem, plcCfg, memMetrics, createPageEvictionTracker(plcCfg, storePageMem));
-
CheckpointStatus status = readCheckpointStatus();
- cctx.pageStore().initializeForMetastorage();
-
- storePageMem.start();
-
checkpointReadLock();
try {
- restoreMemory(status, true, storePageMem, Collections.emptySet());
+ dataRegion(METASTORE_DATA_REGION_NAME).pageMemory().start();
- metaStorage = new MetaStorage(cctx, regCfg, memMetrics, true);
+ performBinaryMemoryRestore(status, g -> MetaStorage.METASTORAGE_CACHE_ID == g, false);
- metaStorage.init(this);
+ metaStorage = createMetastorage(true);
- applyLastUpdates(status, true);
+ applyLogicalUpdates(status, g -> MetaStorage.METASTORAGE_CACHE_ID == g, false);
fillWalDisabledGroups();
@@ -703,7 +714,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
finally {
metaStorage = null;
- storePageMem.stop(true);
+ dataRegion(METASTORE_DATA_REGION_NAME).pageMemory().stop(false);
cctx.pageStore().cleanupPageStoreIfMatch(new Predicate<Integer>() {
@Override public boolean test(Integer grpId) {
@@ -729,21 +740,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
snapshotMgr = cctx.snapshot();
- if (!cctx.kernalContext().clientNode() && persistenceCfg.getCheckpointThreads() > 1) {
- asyncRunner = new IgniteThreadPoolExecutor(
- CHECKPOINT_RUNNER_THREAD_PREFIX,
- cctx.igniteInstanceName(),
- persistenceCfg.getCheckpointThreads(),
- persistenceCfg.getCheckpointThreads(),
- 30_000,
- new LinkedBlockingQueue<>()
- );
- }
-
- if (checkpointer == null)
+ if (!cctx.kernalContext().clientNode() && checkpointer == null)
checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
super.onActivate(ctx);
+
+ if (!cctx.kernalContext().clientNode()) {
+ initializeCheckpointPool();
+
+ finishRecovery();
+ }
}
/** {@inheritDoc} */
@@ -760,6 +766,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
stopping = false;
}
+ /**
+ *
+ */
+ private void initializeCheckpointPool() {
+ if (persistenceCfg.getCheckpointThreads() > 1)
+ asyncRunner = new IgniteThreadPoolExecutor(
+ CHECKPOINT_RUNNER_THREAD_PREFIX,
+ cctx.igniteInstanceName(),
+ persistenceCfg.getCheckpointThreads(),
+ persistenceCfg.getCheckpointThreads(),
+ 30_000,
+ new LinkedBlockingQueue<Runnable>()
+ );
+ }
+
/** {@inheritDoc} */
@Override protected void registerMetricsMBeans(IgniteConfiguration cfg) {
super.registerMetricsMBeans(cfg);
@@ -798,8 +819,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
};
}
- /** {@inheritDoc} */
- @Override public void onDoneRestoreBinaryMemory() throws IgniteCheckedException {
+ /**
+ * Restores last valid WAL pointer and resumes logging from that pointer.
+ * Re-creates metastorage if needed.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ private void finishRecovery() throws IgniteCheckedException {
assert !cctx.kernalContext().clientNode();
long time = System.currentTimeMillis();
@@ -810,37 +836,24 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.beforeResumeWalLogging(this);
- cctx.pageStore().initializeForMetastorage();
-
- CheckpointStatus status = readCheckpointStatus();
-
- // Binary memory should be recovered at startup.
- assert !status.needRestoreMemory() : status;
-
- WALPointer statusEndPtr = CheckpointStatus.NULL_PTR.equals(status.endPtr) ? null : status.endPtr;
+ // Try to resume logging since last finished checkpoint if possible.
+ if (walTail == null) {
+ CheckpointStatus status = readCheckpointStatus();
- // If binary memory recovery occurs resume from the last walTail in the other case from END checkpoint.
- WALPointer walPtr = walTail == null ? statusEndPtr : walTail;
+ walTail = CheckpointStatus.NULL_PTR.equals(status.endPtr) ? null : status.endPtr;
+ }
- cctx.wal().resumeLogging(walPtr);
+ cctx.wal().resumeLogging(walTail);
walTail = null;
- metaStorage = new MetaStorage(
- cctx,
- dataRegionMap.get(METASTORE_DATA_REGION_NAME),
- (DataRegionMetricsImpl)memMetricsMap.get(METASTORE_DATA_REGION_NAME),
- false
- );
-
- // Init metastore only after WAL logging resumed. Can't do it earlier because
- // MetaStorage first initialization also touches WAL, look at #isWalDeltaRecordNeeded.
- metaStorage.init(this);
+ // Recreate metastorage to refresh page memory state after deactivation.
+ if (metaStorage == null)
+ metaStorage = createMetastorage(false);
notifyMetastorageReadyForReadWrite();
- for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
- lsnr.afterMemoryRestore(this);
+ U.log(log, "Finish recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, StorageException.class, IOException.class))
@@ -850,17 +863,35 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
finally {
checkpointReadUnlock();
-
- U.log(log, "Resume logging performed in " + (System.currentTimeMillis() - time) + " ms.");
}
}
/**
- * @param cacheGrps Cache groups to restore.
+ * @param readOnly Metastorage read-only mode.
+ * @return Instance of Metastorage.
+ * @throws IgniteCheckedException If failed to create metastorage.
+ */
+ private MetaStorage createMetastorage(boolean readOnly) throws IgniteCheckedException {
+ cctx.pageStore().initializeForMetastorage();
+
+ MetaStorage storage = new MetaStorage(
+ cctx,
+ dataRegion(METASTORE_DATA_REGION_NAME),
+ (DataRegionMetricsImpl) memMetricsMap.get(METASTORE_DATA_REGION_NAME),
+ readOnly
+ );
+
+ storage.init(this);
+
+ return storage;
+ }
+
+ /**
+ * @param cacheGroupsPredicate Cache groups to restore.
* @return Last seen WAL pointer during binary memory recovery.
* @throws IgniteCheckedException If failed.
*/
- protected WALPointer restoreBinaryMemory(Set<Integer> cacheGrps) throws IgniteCheckedException {
+ private WALPointer restoreBinaryMemory(Predicate<Integer> cacheGroupsPredicate) throws IgniteCheckedException {
assert !cctx.kernalContext().clientNode();
long time = System.currentTimeMillis();
@@ -868,28 +899,23 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointReadLock();
try {
- for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
- lsnr.beforeBinaryMemoryRestore(this);
-
- cctx.pageStore().initializeForMetastorage();
-
CheckpointStatus status = readCheckpointStatus();
// First, bring memory to the last consistent checkpoint state if needed.
// This method should return a pointer to the last valid record in the WAL.
- WALPointer tailWalPtr = restoreMemory(status,
- false,
- (PageMemoryEx)dataRegionMap.get(METASTORE_DATA_REGION_NAME).pageMemory(),
- cacheGrps);
+ WALPointer restored = performBinaryMemoryRestore(status, cacheGroupsPredicate, true);
- if (tailWalPtr == null && !status.endPtr.equals(CheckpointStatus.NULL_PTR)) {
+ if (restored == null && !status.endPtr.equals(CheckpointStatus.NULL_PTR)) {
throw new StorageException("The memory cannot be restored. The critical part of WAL archive is missing " +
- "[tailWalPtr=" + tailWalPtr + ", endPtr=" + status.endPtr + ']');
+ "[tailWalPtr=" + restored + ", endPtr=" + status.endPtr + ']');
}
- nodeStart(tailWalPtr);
+ nodeStart(restored);
+
+ if (log.isInfoEnabled())
+ log.info("Binary recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
- return tailWalPtr;
+ return restored;
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, StorageException.class, IOException.class))
@@ -899,9 +925,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
finally {
checkpointReadUnlock();
-
- if (log.isInfoEnabled())
- log.info("Binary recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
}
}
@@ -1015,21 +1038,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
MBEAN_GROUP,
MBEAN_NAME
);
+
+ metaStorage = null;
}
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
super.stop0(cancel);
- if (!cctx.kernalContext().clientNode()) {
- if (fileLockHolder != null) {
- if (log.isDebugEnabled())
- log.debug("Release file lock [nodeId=" +
- cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
-
- fileLockHolder.close();
- }
- }
+ releaseFileLock();
}
/** */
@@ -1322,34 +1339,30 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
- @Override public boolean beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
- DiscoveryEvent discoEvt = fut.firstEvent();
+ @Override public void beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+ // Try to restore partition states.
+ if (fut.localJoinExchange() || fut.activateCluster()
+ || (fut.exchangeActions() != null && !F.isEmpty(fut.exchangeActions().cacheGroupsToStart()))) {
+ U.doInParallel(
+ cctx.kernalContext().getSystemExecutorService(),
+ cctx.cache().cacheGroups(),
+ cacheGroup -> {
+ if (cacheGroup.isLocal())
+ return;
+
+ cctx.database().checkpointReadLock();
- boolean joinEvt = discoEvt.type() == EventType.EVT_NODE_JOINED;
-
- boolean locNode = discoEvt.eventNode().isLocal();
-
- boolean isSrvNode = !cctx.kernalContext().clientNode();
-
- boolean clusterInTransitionStateToActive = fut.activateCluster();
-
- boolean restored = false;
-
- long time = System.currentTimeMillis();
-
- // In case of cluster activation or local join restore, restore whole manager state.
- if (clusterInTransitionStateToActive || (joinEvt && locNode && isSrvNode)) {
- restoreState();
-
- restored = true;
- }
- // In case of starting groups, restore partition states only for these groups.
- else if (fut.exchangeActions() != null && !F.isEmpty(fut.exchangeActions().cacheGroupsToStart())) {
- Set<Integer> restoreGroups = fut.exchangeActions().cacheGroupsToStart().stream()
- .map(actionData -> actionData.descriptor().groupId())
- .collect(Collectors.toSet());
+ try {
+ cacheGroup.restorePartitionStates(Collections.emptyMap());
- restorePartitionStates(Collections.emptyMap(), restoreGroups);
+ if (cacheGroup.localStartVersion().equals(fut.initialVersion()))
+ cacheGroup.topology().afterStateRestored(fut.initialVersion());
+ }
+ finally {
+ cctx.database().checkpointReadUnlock();
+ }
+ }
+ );
}
if (cctx.kernalContext().query().moduleEnabled()) {
@@ -1366,11 +1379,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
}
-
- if (log.isInfoEnabled())
- log.info("Logical recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
-
- return restored;
}
/**
@@ -1661,52 +1669,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
}
- /**
- * Restores from last checkpoint and applies WAL changes since this checkpoint.
- *
- * @throws IgniteCheckedException If failed to restore database status from WAL.
- */
- private void restoreState() throws IgniteCheckedException {
- try {
- CheckpointStatus status = readCheckpointStatus();
-
- checkpointReadLock();
-
- try {
- applyLastUpdates(status, false);
- }
- finally {
- checkpointReadUnlock();
- }
-
- snapshotMgr.restoreState();
- }
- catch (StorageException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /**
- * Called when all partitions have been fully restored and pre-created on node start.
- *
- * Starts checkpointing process and initiates first checkpoint.
- *
- * @throws IgniteCheckedException If first checkpoint has failed.
- */
- @Override public void onStateRestored() throws IgniteCheckedException {
- long time = System.currentTimeMillis();
-
- new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer).start();
-
- CheckpointProgressSnapshot chp = checkpointer.wakeupForCheckpoint(0, "node started");
-
- if (chp != null)
- chp.cpBeginFut.get();
-
- if (log.isInfoEnabled())
- log.info("Checkpointer initilialzation performed in " + (System.currentTimeMillis() - time) + " ms.");
- }
-
/** {@inheritDoc} */
@Override public synchronized Map<Integer, Map<Integer, Long>> reserveHistoryForExchange() {
assert reservedForExchange == null : reservedForExchange;
@@ -2005,39 +1967,146 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (kctx.clientNode())
return;
- // Preform early regions startup before restoring state.
- initAndStartRegions(kctx.config().getDataStorageConfiguration());
+ checkpointReadLock();
+
+ try {
+ // Preform early regions startup before restoring state.
+ initAndStartRegions(kctx.config().getDataStorageConfiguration());
+
+ for (DatabaseLifecycleListener lsnr : getDatabaseListeners(kctx))
+ lsnr.beforeBinaryMemoryRestore(this);
+
+ log.info("Starting binary memory restore for: " + cctx.cache().cacheGroupDescriptors().keySet());
+
+ cctx.pageStore().initializeForMetastorage();
+
+ // Restore binary memory for all not WAL disabled cache groups.
+ WALPointer restored = restoreBinaryMemory(
+ g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g)
+ );
+
+ if (restored != null)
+ U.log(log, "Binary memory state restored at node startup [restoredPtr=" + restored + ']');
+
+ for (DatabaseLifecycleListener lsnr : getDatabaseListeners(kctx))
+ lsnr.afterBinaryMemoryRestore(this);
+
+ cctx.wal().resumeLogging(restored);
+
+ // We should log this record to ensure that node start marker pointer will be found in compacted segment.
+ cctx.wal().log(new MemoryRecoveryRecord(System.currentTimeMillis()));
- // Only presistence caches to start.
- for (DynamicCacheDescriptor desc : cctx.cache().cacheDescriptors().values()) {
- if (CU.isPersistentCache(desc.cacheConfiguration(), cctx.gridConfig().getDataStorageConfiguration()))
- storeMgr.initializeForCache(desc.groupDescriptor(), new StoredCacheData(desc.cacheConfiguration()));
+ assert metaStorage == null;
+
+ metaStorage = createMetastorage(false);
+
+ CheckpointStatus status = readCheckpointStatus();
+
+ RestoreLogicalState logicalState = applyLogicalUpdates(
+ status,
+ g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g),
+ true
+ );
+
+ // Restore state for all groups.
+ restorePartitionStates(cctx.cache().cacheGroups(), logicalState.partitionRecoveryStates);
+
+ walTail = tailPointer(logicalState.lastRead);
+
+ cctx.wal().onDeActivate(kctx);
}
+ catch (IgniteCheckedException e) {
+ releaseFileLock();
- final WALPointer restoredPtr = restoreBinaryMemory(cctx.cache().cacheGroupDescriptors().keySet());
+ throw e;
+ }
+ finally {
+ checkpointReadUnlock();
+ }
+ }
- walTail = restoredPtr;
+ /**
+ * Calculates tail pointer for WAL at the end of logical recovery.
+ *
+ * @param from Start replay WAL from.
+ * @return Tail pointer.
+ * @throws IgniteCheckedException If failed.
+ */
+ private WALPointer tailPointer(WALPointer from) throws IgniteCheckedException {
+ WALPointer lastRead = from;
+
+ try (WALIterator it = cctx.wal().replay(from)) {
+ while (it.hasNextX()) {
+ IgniteBiTuple<WALPointer, WALRecord> rec = it.nextX();
- if (restoredPtr != null)
- U.log(log, "Binary memory state restored at node startup [restoredPtr=" + restoredPtr + ']');
+ if (rec == null)
+ break;
+
+ lastRead = rec.get1();
+ }
+ }
+
+ return lastRead != null ? lastRead.next() : null;
+ }
+
+ /**
+ * @param forGroups Cache groups.
+ * @param partitionStates Partition states.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void restorePartitionStates(
+ Collection<CacheGroupContext> forGroups,
+ Map<GroupPartitionId, PartitionRecoverState> partitionStates
+ ) throws IgniteCheckedException {
+ long startRestorePart = U.currentTimeMillis();
+
+ if (log.isInfoEnabled())
+ log.info("Restoring partition state for local groups.");
+
+ long totalProcessed = 0;
+
+ for (CacheGroupContext grp : forGroups)
+ totalProcessed += grp.restorePartitionStates(partitionStates);
+
+ if (log.isInfoEnabled())
+ log.info("Finished restoring partition state for local groups [" +
+ "groupsProcessed" + forGroups.size() +
+ "partitionsProcessed=" + totalProcessed +
+ ", time=" + (U.currentTimeMillis() - startRestorePart) + "ms]");
+ }
+
+ /**
+ * Called when all partitions have been fully restored and pre-created on node start.
+ *
+ * Starts checkpointing process and initiates first checkpoint.
+ *
+ * @throws IgniteCheckedException If first checkpoint has failed.
+ */
+ @Override public void onStateRestored(AffinityTopologyVersion topVer) throws IgniteCheckedException {
+ long time = System.currentTimeMillis();
+
+ new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer).start();
+
+ CheckpointProgressSnapshot chp = checkpointer.wakeupForCheckpoint(0, "node started");
+
+ if (chp != null)
+ chp.cpBeginFut.get();
+
+ if (log.isInfoEnabled())
+ log.info("Checkpointer initilialzation performed in " + (System.currentTimeMillis() - time) + " ms.");
}
/**
* @param status Checkpoint status.
- * @param metastoreOnly If {@code True} restores Metastorage only.
- * @param storePageMem Metastore page memory.
- * @param cacheGrps Cache groups to restore.
+ * @param cacheGroupsPredicate Cache groups to restore.
* @throws IgniteCheckedException If failed.
* @throws StorageException In case I/O error occurred during operations with storage.
*/
- @Nullable private WALPointer restoreMemory(
+ private WALPointer performBinaryMemoryRestore(
CheckpointStatus status,
- boolean metastoreOnly,
- PageMemoryEx storePageMem,
- Set<Integer> cacheGrps
+ Predicate<Integer> cacheGroupsPredicate,
+ boolean finalizeState
) throws IgniteCheckedException {
- assert !metastoreOnly || storePageMem != null;
-
if (log.isInfoEnabled())
log.info("Checking memory state [lastValidPos=" + status.endPtr + ", lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
@@ -2045,8 +2114,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
boolean apply = status.needRestoreMemory();
if (apply) {
- U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " +
- "finish checkpoint on node start.");
+ if (finalizeState)
+ U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " +
+ "finish checkpoint on node start.");
cctx.pageStore().beginRecover();
}
@@ -2057,16 +2127,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
long lastArchivedSegment = cctx.wal().lastArchivedSegment();
- RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, lastArchivedSegment, log);
-
- // Always perform recovery at least meta storage cache.
- Set<Integer> restoreGrps = new HashSet<>(Collections.singletonList(METASTORAGE_CACHE_ID));
-
- if (!metastoreOnly && !F.isEmpty(cacheGrps)) {
- restoreGrps.addAll(cacheGrps.stream()
- .filter(g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g))
- .collect(Collectors.toSet()));
- }
+ RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, lastArchivedSegment, cacheGroupsPredicate);
int applied = 0;
@@ -2086,29 +2147,30 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
// several repetitive restarts and the same pages may have changed several times.
int grpId = pageRec.fullPageId().groupId();
- if (restoreGrps.contains(grpId)) {
- long pageId = pageRec.fullPageId().pageId();
+ long pageId = pageRec.fullPageId().pageId();
- PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId);
+ PageMemoryEx pageMem = getPageMemoryForCacheGroup(grpId);
- long page = pageMem.acquirePage(grpId, pageId, true);
+ if (pageMem == null)
+ break;
- try {
- long pageAddr = pageMem.writeLock(grpId, pageId, page);
+ long page = pageMem.acquirePage(grpId, pageId, true);
- try {
- PageUtils.putBytes(pageAddr, 0, pageRec.pageData());
- }
- finally {
- pageMem.writeUnlock(grpId, pageId, page, null, true, true);
- }
+ try {
+ long pageAddr = pageMem.writeLock(grpId, pageId, page);
+
+ try {
+ PageUtils.putBytes(pageAddr, 0, pageRec.pageData());
}
finally {
- pageMem.releasePage(grpId, pageId, page);
+ pageMem.writeUnlock(grpId, pageId, page, null, true, true);
}
-
- applied++;
}
+ finally {
+ pageMem.releasePage(grpId, pageId, page);
+ }
+
+ applied++;
}
break;
@@ -2119,9 +2181,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
{
int grpId = metaStateRecord.groupId();
- if (!restoreGrps.contains(grpId))
- continue;
-
int partId = metaStateRecord.partitionId();
GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(metaStateRecord.state());
@@ -2140,10 +2199,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
{
int grpId = destroyRecord.groupId();
- if (!restoreGrps.contains(grpId))
- continue;
+ PageMemoryEx pageMem = getPageMemoryForCacheGroup(grpId);
- PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId);
+ if (pageMem == null)
+ break;
pageMem.invalidate(grpId, destroyRecord.partitionId());
@@ -2158,37 +2217,38 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
int grpId = r.groupId();
- if (restoreGrps.contains(grpId)) {
- long pageId = r.pageId();
+ long pageId = r.pageId();
- PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId);
+ PageMemoryEx pageMem = getPageMemoryForCacheGroup(grpId);
- // Here we do not require tag check because we may be applying memory changes after
- // several repetitive restarts and the same pages may have changed several times.
- long page = pageMem.acquirePage(grpId, pageId, true);
+ if (pageMem == null)
+ break;
- try {
- long pageAddr = pageMem.writeLock(grpId, pageId, page);
+ // Here we do not require tag check because we may be applying memory changes after
+ // several repetitive restarts and the same pages may have changed several times.
+ long page = pageMem.acquirePage(grpId, pageId, true);
- try {
- r.applyDelta(pageMem, pageAddr);
- }
- finally {
- pageMem.writeUnlock(grpId, pageId, page, null, true, true);
- }
+ try {
+ long pageAddr = pageMem.writeLock(grpId, pageId, page);
+
+ try {
+ r.applyDelta(pageMem, pageAddr);
}
finally {
- pageMem.releasePage(grpId, pageId, page);
+ pageMem.writeUnlock(grpId, pageId, page, null, true, true);
}
-
- applied++;
}
+ finally {
+ pageMem.releasePage(grpId, pageId, page);
+ }
+
+ applied++;
}
}
}
}
- if (metastoreOnly)
+ if (!finalizeState)
return null;
WALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer();
@@ -2208,7 +2268,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cpHistory.initialize(retreiveHistory());
- return lastReadPtr == null ? null : lastReadPtr.next();
+ return lastReadPtr != null ? lastReadPtr.next() : null;
}
/**
@@ -2219,6 +2279,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @throws IgniteCheckedException if no DataRegion is configured for a name obtained from cache descriptor.
*/
private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException {
+ if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
+ return (PageMemoryEx)dataRegion(METASTORE_DATA_REGION_NAME).pageMemory();
+
// TODO IGNITE-7792 add generic mapping.
if (grpId == TxLog.TX_LOG_CACHE_ID)
return (PageMemoryEx)dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory();
@@ -2229,7 +2292,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
CacheGroupDescriptor desc = sharedCtx.cache().cacheGroupDescriptors().get(grpId);
if (desc == null)
- throw new IgniteCheckedException("Failed to find cache group descriptor [grpId=" + grpId + ']');
+ return null;
String memPlcName = desc.config().getDataRegionName();
@@ -2242,13 +2305,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param it WalIterator.
* @param recPredicate Wal record filter.
* @param entryPredicate Entry filter.
- * @param partStates Partition to restore state.
+ * @param partitionRecoveryStates Partition to restore state.
*/
public void applyUpdatesOnRecovery(
@Nullable WALIterator it,
IgnitePredicate<IgniteBiTuple<WALPointer, WALRecord>> recPredicate,
IgnitePredicate<DataEntry> entryPredicate,
- Map<T2<Integer, Integer>, T2<Integer, Long>> partStates
+ Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates
) throws IgniteCheckedException {
cctx.walState().runWithOutWAL(() -> {
if (it != null) {
@@ -2306,7 +2369,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointReadLock();
try {
- restorePartitionStates(partStates, null);
+ restorePartitionStates(cctx.cache().cacheGroups(), partitionRecoveryStates);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -2319,31 +2382,30 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* @param status Last registered checkpoint status.
- * @param metastoreOnly If {@code True} only records related to metastorage will be processed.
* @throws IgniteCheckedException If failed to apply updates.
* @throws StorageException If IO exception occurred while reading write-ahead log.
*/
- private void applyLastUpdates(CheckpointStatus status, boolean metastoreOnly) throws IgniteCheckedException {
+ private RestoreLogicalState applyLogicalUpdates(
+ CheckpointStatus status,
+ Predicate<Integer> cacheGroupsPredicate,
+ boolean skipFieldLookup
+ ) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("Applying lost cache updates since last checkpoint record [lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
- if (!metastoreOnly)
+ if (skipFieldLookup)
cctx.kernalContext().query().skipFieldLookup(true);
long lastArchivedSegment = cctx.wal().lastArchivedSegment();
- RestoreLogicalState restoreLogicalState = new RestoreLogicalState(lastArchivedSegment, log);
+ RestoreLogicalState restoreLogicalState = new RestoreLogicalState(lastArchivedSegment, cacheGroupsPredicate);
long start = U.currentTimeMillis();
- int applied = 0;
- Collection<Integer> ignoreGrps = metastoreOnly ? Collections.emptySet() :
- F.concat(false, initiallyGlobalWalDisabledGrps, initiallyLocalWalDisabledGrps);
+ int applied = 0;
try (WALIterator it = cctx.wal().replay(status.startPtr)) {
- Map<T2<Integer, Integer>, T2<Integer, Long>> partStates = new HashMap<>();
-
while (it.hasNextX()) {
WALRecord rec = restoreLogicalState.next(it);
@@ -2352,9 +2414,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
switch (rec.type()) {
case DATA_RECORD:
- if (metastoreOnly)
- continue;
-
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
@@ -2366,27 +2425,27 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (cacheDesc == null)
continue;
- if (!ignoreGrps.contains(cacheDesc.groupId())) {
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
- applyUpdate(cacheCtx, dataEntry);
+ applyUpdate(cacheCtx, dataEntry);
- applied++;
- }
+ applied++;
}
break;
case PART_META_UPDATE_STATE:
- if (metastoreOnly)
- continue;
-
PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
- if (!ignoreGrps.contains(metaStateRecord.groupId())) {
- partStates.put(new T2<>(metaStateRecord.groupId(), metaStateRecord.partitionId()),
- new T2<>((int)metaStateRecord.state(), metaStateRecord.updateCounter()));
- }
+ GroupPartitionId groupPartitionId = new GroupPartitionId(
+ metaStateRecord.groupId(), metaStateRecord.partitionId()
+ );
+
+ PartitionRecoverState state = new PartitionRecoverState(
+ (int)metaStateRecord.state(), metaStateRecord.updateCounter()
+ );
+
+ restoreLogicalState.partitionRecoveryStates.put(groupPartitionId, state);
break;
@@ -2400,13 +2459,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
- if (metastoreOnly)
- continue;
-
PageDeltaRecord rec0 = (PageDeltaRecord) rec;
PageMemoryEx pageMem = getPageMemoryForCacheGroup(rec0.groupId());
+ if (pageMem == null)
+ break;
+
long page = pageMem.acquirePage(rec0.groupId(), rec0.pageId(), true);
try {
@@ -2429,166 +2488,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
// Skip other records.
}
}
-
- if (!metastoreOnly) {
- long startRestorePart = U.currentTimeMillis();
-
- if (log.isInfoEnabled())
- log.info("Restoring partition state for local groups [cntPartStateWal="
- + partStates.size() + ", lastCheckpointId=" + status.cpStartId + ']');
-
- long proc = restorePartitionStates(partStates, null);
-
- if (log.isInfoEnabled())
- log.info("Finished restoring partition state for local groups [cntProcessed=" + proc +
- ", cntPartStateWal=" + partStates.size() +
- ", time=" + (U.currentTimeMillis() - startRestorePart) + "ms]");
- }
}
finally {
- if (!metastoreOnly)
+ if (skipFieldLookup)
cctx.kernalContext().query().skipFieldLookup(false);
}
if (log.isInfoEnabled())
log.info("Finished applying WAL changes [updatesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + " ms]");
- }
- /**
- * Initializes not empty partitions and restores their state from page memory or WAL.
- * Partition states presented in page memory may be overriden by states restored from WAL {@code partStates}.
- *
- * @param partStates Partition states restored from WAL.
- * @param onlyForGroups If not {@code null} restore states only for specified cache groups.
- * @return cntParts Count of partitions processed.
- * @throws IgniteCheckedException If failed to restore partition states.
- */
- private long restorePartitionStates(
- Map<T2<Integer, Integer>, T2<Integer, Long>> partStates,
- @Nullable Set<Integer> onlyForGroups
- ) throws IgniteCheckedException {
- long cntParts = 0;
-
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal() || !grp.affinityNode()) {
- // Local cache has no partitions and its states.
- continue;
- }
-
- if (!grp.dataRegion().config().isPersistenceEnabled())
- continue;
-
- if (onlyForGroups != null && !onlyForGroups.contains(grp.groupId()))
- continue;
-
- int grpId = grp.groupId();
-
- PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
-
- for (int i = 0; i < grp.affinity().partitions(); i++) {
- T2<Integer, Long> restore = partStates.get(new T2<>(grpId, i));
-
- if (storeMgr.exists(grpId, i)) {
- storeMgr.ensure(grpId, i);
-
- if (storeMgr.pages(grpId, i) <= 1)
- continue;
-
- if (log.isDebugEnabled())
- log.debug("Creating partition on recovery (exists in page store) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + i + "]");
-
- GridDhtLocalPartition part = grp.topology().forceCreatePartition(i);
-
- assert part != null;
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-6097
- grp.offheap().onPartitionInitialCounterUpdated(i, 0);
-
- checkpointReadLock();
-
- try {
- long partMetaId = pageMem.partitionMetaPageId(grpId, i);
- long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
-
- try {
- long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
-
- boolean changed = false;
-
- try {
- PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
-
- if (restore != null) {
- int stateId = restore.get1();
-
- io.setPartitionState(pageAddr, (byte)stateId);
-
- changed = updateState(part, stateId);
-
- if (stateId == GridDhtPartitionState.OWNING.ordinal()
- || (stateId == GridDhtPartitionState.MOVING.ordinal()
- && part.initialUpdateCounter() < restore.get2())) {
- part.initialUpdateCounter(restore.get2());
-
- changed = true;
- }
-
- if (log.isDebugEnabled())
- log.debug("Restored partition state (from WAL) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + i + ", state=" + part.state() +
- "updCntr=" + part.initialUpdateCounter() + "]");
- }
- else {
- changed = updateState(part, (int) io.getPartitionState(pageAddr));
-
- if (log.isDebugEnabled())
- log.debug("Restored partition state (from page memory) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + i + ", state=" + part.state() +
- "updCntr=" + part.initialUpdateCounter() + "]");
- }
- }
- finally {
- pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
- }
- }
- finally {
- pageMem.releasePage(grpId, partMetaId, partMetaPage);
- }
- }
- finally {
- checkpointReadUnlock();
- }
- }
- else if (restore != null) {
- if (log.isDebugEnabled())
- log.debug("Creating partition on recovery (exists in WAL) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + i + "]");
-
- GridDhtLocalPartition part = grp.topology().forceCreatePartition(i);
-
- assert part != null;
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-6097
- grp.offheap().onPartitionInitialCounterUpdated(i, 0);
-
- updateState(part, restore.get1());
-
- if (log.isDebugEnabled())
- log.debug("Restored partition state (from WAL) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + i + ", state=" + part.state() +
- "updCntr=" + part.initialUpdateCounter() + "]");
- }
-
- cntParts++;
- }
-
- // After partition states are restored, it is necessary to update internal data structures in topology.
- grp.topology().afterStateRestored(grp.topology().lastTopologyChangeVersion());
- }
-
- return cntParts;
+ return restoreLogicalState;
}
/**
@@ -2604,25 +2514,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
- * @param part Partition to restore state for.
- * @param stateId State enum ordinal.
- * @return Updated flag.
- */
- private boolean updateState(GridDhtLocalPartition part, int stateId) {
- if (stateId != -1) {
- GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId);
-
- assert state != null;
-
- part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state);
-
- return true;
- }
-
- return false;
- }
-
- /**
* @param cacheCtx Cache context to apply an update.
* @param dataEntry Data entry to apply.
* @throws IgniteCheckedException If failed to restore.
@@ -3143,6 +3034,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
while (!isCancelled()) {
waitCheckpointEvent();
+ if (skipCheckpointOnNodeStop && (isStopping() || shutdownNow)) {
+ if (log.isInfoEnabled())
+ log.warning("Skipping last checkpoint because node is stopping.");
+
+ return;
+ }
+
GridFutureAdapter<Void> enableChangeApplied = GridCacheDatabaseSharedManager.this.enableChangeApplied;
if (enableChangeApplied != null) {
@@ -3574,7 +3472,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (req != null)
req.waitCompleted();
- if (log.isDebugEnabled())
+ if (req != null && log.isDebugEnabled())
log.debug("Partition file destroy has cancelled [grpId=" + grpId + ", partId=" + partId + "]");
}
@@ -4137,7 +4035,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
else {
CacheGroupContext grp = context().cache().cacheGroup(grpId);
- DataRegion region = grp != null ?grp .dataRegion() : null;
+ DataRegion region = grp != null ? grp.dataRegion() : null;
if (region == null || !region.config().isPersistenceEnabled())
continue;
@@ -4645,10 +4543,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*
*/
private void fillWalDisabledGroups() {
- MetaStorage meta = cctx.database().metaStorage();
+ assert metaStorage != null;
try {
- Set<String> keys = meta.readForPredicate(WAL_KEY_PREFIX_PRED).keySet();
+ Set<String> keys = metaStorage.readForPredicate(WAL_KEY_PREFIX_PRED).keySet();
if (keys.isEmpty())
return;
@@ -4713,23 +4611,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* Abstract class for create restore context.
*/
- public abstract static class RestoreStateContext {
- /** */
- protected final IgniteLogger log;
-
+ private abstract class RestoreStateContext {
/** Last archived segment. */
protected final long lastArchivedSegment;
/** Last read record WAL pointer. */
protected FileWALPointer lastRead;
+ /** Only {@link WalRecordCacheGroupAware} records satisfied this predicate will be applied. */
+ private final Predicate<Integer> cacheGroupPredicate;
+
+ /** Set to {@code true} if data records should be skipped. */
+ private final boolean skipDataRecords;
+
/**
* @param lastArchivedSegment Last archived segment index.
- * @param log Ignite logger.
*/
- public RestoreStateContext(long lastArchivedSegment, IgniteLogger log) {
+ public RestoreStateContext(long lastArchivedSegment, Predicate<Integer> cacheGroupPredicate, boolean skipDataRecords) {
this.lastArchivedSegment = lastArchivedSegment;
- this.log = log;
+ this.cacheGroupPredicate = cacheGroupPredicate;
+ this.skipDataRecords = skipDataRecords;
}
/**
@@ -4741,17 +4642,63 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*/
public WALRecord next(WALIterator it) throws IgniteCheckedException {
try {
- IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
+ for (;;) {
+ if (!it.hasNextX())
+ return null;
- WALRecord rec = tup.get2();
+ IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
- WALPointer ptr = tup.get1();
+ if (tup == null)
+ return null;
- lastRead = (FileWALPointer)ptr;
+ WALRecord rec = tup.get2();
- rec.position(ptr);
+ WALPointer ptr = tup.get1();
- return rec;
+ lastRead = (FileWALPointer)ptr;
+
+ rec.position(ptr);
+
+ // Filter out records.
+ if (rec instanceof WalRecordCacheGroupAware) {
+ WalRecordCacheGroupAware groupAwareRecord = (WalRecordCacheGroupAware) rec;
+
+ if (!cacheGroupPredicate.test(groupAwareRecord.groupId()))
+ continue;
+ }
+
+ switch (rec.type()) {
+ case METASTORE_DATA_RECORD:
+ case DATA_RECORD:
+ if (skipDataRecords)
+ continue;
+
+ if (rec instanceof DataRecord) {
+ DataRecord dataRecord = (DataRecord) rec;
+
+ // Filter data entries by group id.
+ List<DataEntry> filteredEntries = dataRecord.writeEntries().stream()
+ .filter(entry -> {
+ if (entry == null)
+ return false;
+
+ int cacheId = entry.cacheId();
+
+ return cctx != null && cctx.cacheContext(cacheId) != null && cacheGroupPredicate.test(cctx.cacheContext(cacheId).groupId());
+ })
+ .collect(Collectors.toList());
+
+ dataRecord.setWriteEntries(filteredEntries);
+ }
+
+ break;
+
+ default:
+ break;
+ }
+
+ return rec;
+ }
}
catch (IgniteCheckedException e) {
boolean throwsCRCError = throwsCRCError();
@@ -4791,7 +4738,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* Restore memory context. Tracks the safety of binary recovery.
*/
- public static class RestoreBinaryState extends RestoreStateContext {
+ private class RestoreBinaryState extends RestoreStateContext {
/** Checkpoint status. */
private final CheckpointStatus status;
@@ -4801,13 +4748,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* @param status Checkpoint status.
* @param lastArchivedSegment Last archived segment index.
- * @param log Ignite logger.
*/
- public RestoreBinaryState(CheckpointStatus status, long lastArchivedSegment, IgniteLogger log) {
- super(lastArchivedSegment, log);
+ public RestoreBinaryState(CheckpointStatus status, long lastArchivedSegment, Predicate<Integer> cacheGroupsPredicate) {
+ super(lastArchivedSegment, cacheGroupsPredicate, true);
this.status = status;
- needApplyBinaryUpdates = status.needRestoreMemory();
+ this.needApplyBinaryUpdates = status.needRestoreMemory();
}
/**
@@ -4867,13 +4813,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* Restore logical state context. Tracks the safety of logical recovery.
*/
- public static class RestoreLogicalState extends RestoreStateContext {
+ private class RestoreLogicalState extends RestoreStateContext {
+ /** States of partitions recovered during applying logical updates. */
+ private final Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates = new HashMap<>();
+
/**
* @param lastArchivedSegment Last archived segment index.
- * @param log Ignite logger.
*/
- public RestoreLogicalState(long lastArchivedSegment, IgniteLogger log) {
- super(lastArchivedSegment, log);
+ public RestoreLogicalState(long lastArchivedSegment, Predicate<Integer> cacheGroupsPredicate) {
+ super(lastArchivedSegment, cacheGroupsPredicate, false);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 21bd454..4966bca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.management.InstanceNotFoundException;
import org.apache.ignite.DataRegionMetrics;
import org.apache.ignite.DataStorageMetrics;
@@ -45,6 +46,7 @@ import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
@@ -91,13 +93,19 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
private static final long MAX_PAGE_MEMORY_INIT_SIZE_32_BIT = 2L * 1024 * 1024 * 1024;
/** {@code True} to reuse memory on deactive. */
- private final boolean reuseMemory = IgniteSystemProperties.getBoolean(IGNITE_REUSE_MEMORY_ON_DEACTIVATE);
+ protected final boolean reuseMemory = IgniteSystemProperties.getBoolean(IGNITE_REUSE_MEMORY_ON_DEACTIVATE);
+
+ /** */
+ protected final Map<String, DataRegion> dataRegionMap = new ConcurrentHashMap<>();
+
+ /** Stores memory providers eligible for reuse. */
+ private final Map<String, DirectMemoryProvider> memProviderMap = new ConcurrentHashMap<>();
/** */
private static final String MBEAN_GROUP_NAME = "DataRegionMetrics";
/** */
- protected volatile Map<String, DataRegion> dataRegionMap;
+ protected final Map<String, DataRegionMetrics> memMetricsMap = new ConcurrentHashMap<>();
/** */
private volatile boolean dataRegionsInitialized;
@@ -106,9 +114,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
private volatile boolean dataRegionsStarted;
/** */
- protected Map<String, DataRegionMetrics> memMetricsMap;
-
- /** */
protected DataRegion dfltDataRegion;
/** */
@@ -123,8 +128,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
/** First eviction was warned flag. */
private volatile boolean firstEvictWarn;
- /** Stores memory providers eligible for reuse. */
- private Map<String, DirectMemoryProvider> memProviderMap;
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
@@ -267,6 +270,17 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
}
/**
+ *
+ */
+ private void startDataRegions() {
+ for (DataRegion region : dataRegionMap.values()) {
+ region.pageMemory().start();
+
+ region.evictionTracker().start();
+ }
+ }
+
+ /**
* @param memCfg Database config.
* @throws IgniteCheckedException If failed to initialize swap path.
*/
@@ -288,12 +302,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
protected void initDataRegions0(DataStorageConfiguration memCfg) throws IgniteCheckedException {
DataRegionConfiguration[] dataRegionCfgs = memCfg.getDataRegionConfigurations();
- int dataRegions = dataRegionCfgs == null ? 0 : dataRegionCfgs.length;
-
- dataRegionMap = U.newHashMap(3 + dataRegions);
- memMetricsMap = U.newHashMap(3 + dataRegions);
- memProviderMap = reuseMemory ? U.newHashMap(3 + dataRegions) : null;
-
if (dataRegionCfgs != null) {
for (DataRegionConfiguration dataRegionCfg : dataRegionCfgs)
addDataRegion(memCfg, dataRegionCfg, dataRegionCfg.isPersistenceEnabled());
@@ -346,14 +354,14 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
DataRegionMetricsImpl memMetrics = new DataRegionMetricsImpl(dataRegionCfg, freeSpaceProvider(dataRegionCfg));
- DataRegion memPlc = initMemory(dataStorageCfg, dataRegionCfg, memMetrics, trackable);
+ DataRegion region = initMemory(dataStorageCfg, dataRegionCfg, memMetrics, trackable);
- dataRegionMap.put(dataRegionName, memPlc);
+ dataRegionMap.put(dataRegionName, region);
memMetricsMap.put(dataRegionName, memMetrics);
if (dataRegionName.equals(dfltMemPlcName))
- dfltDataRegion = memPlc;
+ dfltDataRegion = region;
else if (dataRegionName.equals(DFLT_DATA_REG_DEFAULT_NAME))
U.warn(log, "Data Region with name 'default' isn't used as a default. " +
"Please, check Data Region configuration.");
@@ -689,13 +697,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
}
/**
- * @throws IgniteCheckedException If fails.
- */
- public void onDoneRestoreBinaryMemory() throws IgniteCheckedException {
- // No-op.
- }
-
- /**
* Creates file with current timestamp and specific "node-started.bin" suffix
* and writes into memory recovery pointer.
*
@@ -729,7 +730,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
if (memPlcName == null)
return dfltDataRegion;
- if (dataRegionMap == null)
+ if (dataRegionMap.isEmpty())
return null;
DataRegion plc;
@@ -856,11 +857,9 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
/**
* @param discoEvt Before exchange for the given discovery event.
- *
- * @return {@code True} if partitions have been restored from persistent storage.
*/
- public boolean beforeExchange(GridDhtPartitionsExchangeFuture discoEvt) throws IgniteCheckedException {
- return false;
+ public void beforeExchange(GridDhtPartitionsExchangeFuture discoEvt) throws IgniteCheckedException {
+
}
/**
@@ -878,7 +877,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
*
* @throws IgniteCheckedException If failed.
*/
- public void onStateRestored() throws IgniteCheckedException {
+ public void onStateRestored(AffinityTopologyVersion topVer) throws IgniteCheckedException {
// No-op.
}
@@ -1026,7 +1025,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
*
* @return {@code True} if policy supports memory reuse.
*/
- private boolean supportsMemoryReuse(DataRegionConfiguration plcCfg) {
+ public boolean supportsMemoryReuse(DataRegionConfiguration plcCfg) {
return reuseMemory && plcCfg.getSwapPath() == null;
}
@@ -1209,11 +1208,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
registerMetricsMBeans(cctx.gridConfig());
- for (DataRegion memPlc : dataRegionMap.values()) {
- memPlc.pageMemory().start();
-
- memPlc.evictionTracker().start();
- }
+ startDataRegions();
initPageMemoryDataStructures(cfg);
@@ -1234,33 +1229,26 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.beforeStop(this);
- if (dataRegionMap != null) {
- for (DataRegion memPlc : dataRegionMap.values()) {
- memPlc.pageMemory().stop(shutdown);
+ for (DataRegion region : dataRegionMap.values()) {
+ region.pageMemory().stop(shutdown);
- memPlc.evictionTracker().stop();
+ region.evictionTracker().stop();
- unregisterMetricsMBean(
- cctx.gridConfig(),
- MBEAN_GROUP_NAME,
- memPlc.memoryMetrics().getName()
- );
+ unregisterMetricsMBean(
+ cctx.gridConfig(),
+ MBEAN_GROUP_NAME,
+ region.memoryMetrics().getName()
+ );
}
- dataRegionMap.clear();
+ dataRegionMap.clear();
- dataRegionMap = null;
+ if (shutdown && memProviderMap != null)
+ memProviderMap.clear();
- if (shutdown && memProviderMap != null) {
- memProviderMap.clear();
+ dataRegionsInitialized = false;
- memProviderMap = null;
- }
-
- dataRegionsInitialized = false;
-
- dataRegionsStarted = false;
- }
+ dataRegionsStarted = false;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java
index 104697e..e0c545b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java
@@ -22,10 +22,6 @@ import java.io.IOException;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.OpenOption;
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.READ;
-import static java.nio.file.StandardOpenOption.WRITE;
-
/**
* File I/O factory which uses {@link AsynchronousFileChannel} based implementation of FileIO.
*/
@@ -37,11 +33,6 @@ public class AsyncFileIOFactory implements FileIOFactory {
private transient volatile ThreadLocal<AsyncFileIO.ChannelOpFuture> holder = initHolder();
/** {@inheritDoc} */
- @Override public FileIO create(File file) throws IOException {
- return create(file, CREATE, READ, WRITE);
- }
-
- /** {@inheritDoc} */
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
if (holder == null) {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIOFactory.java
index 336aab6..b4b0389 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIOFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIOFactory.java
@@ -76,13 +76,6 @@ public class EncryptedFileIOFactory implements FileIOFactory {
}
/** {@inheritDoc} */
- @Override public FileIO create(File file) throws IOException {
- FileIO io = plainIOFactory.create(file);
-
- return new EncryptedFileIO(io, groupId, pageSize, headerSize, encMgr, encSpi);
- }
-
- /** {@inheritDoc} */
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
FileIO io = plainIOFactory.create(file, modes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
index 2735185..b236000 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
@@ -22,6 +22,10 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.file.OpenOption;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
/**
* {@link FileIO} factory definition.
*/
@@ -33,7 +37,9 @@ public interface FileIOFactory extends Serializable {
* @return File I/O interface.
* @throws IOException If I/O interface creation was failed.
*/
- public FileIO create(File file) throws IOException;
+ default FileIO create(File file) throws IOException{
+ return create(file, CREATE, READ, WRITE);
+ };
/**
* Creates I/O interface for file with specified mode.
@@ -43,5 +49,5 @@ public interface FileIOFactory extends Serializable {
* @return File I/O interface.
* @throws IOException If I/O interface creation was failed.
*/
- public FileIO create(File file, OpenOption... modes) throws IOException;
+ FileIO create(File file, OpenOption... modes) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index fdf4705..16d74c3 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -258,6 +258,10 @@ public class FilePageStore implements PageStore {
+ ", delete=" + delete + "]", e);
}
finally {
+ allocatedTracker.updateTotalAllocatedPages(-1L * allocated.getAndSet(0) / pageSize);
+
+ inited = false;
+
lock.writeLock().unlock();
}
}
@@ -542,7 +546,8 @@ public class FilePageStore implements PageStore {
long off = pageOffset(pageId);
assert (off >= 0 && off <= allocated.get()) || recover :
- "off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) + ", pageId=" + U.hexLong(pageId);
+ "off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) +
+ ", pageId=" + U.hexLong(pageId) + ", file=" + cfgFile.getPath();
assert pageBuf.capacity() == pageSize;
assert pageBuf.position() == 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
index 856ba1c..3fa3e2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
@@ -21,10 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.OpenOption;
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.READ;
-import static java.nio.file.StandardOpenOption.WRITE;
-
/**
* File I/O factory which provides RandomAccessFileIO implementation of FileIO.
*/
@@ -33,11 +29,6 @@ public class RandomAccessFileIOFactory implements FileIOFactory {
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override public FileIO create(File file) throws IOException {
- return create(file, CREATE, READ, WRITE);
- }
-
- /** {@inheritDoc} */
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
return new RandomAccessFileIO(file, modes);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageLifecycleListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageLifecycleListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageLifecycleListener.java
index 8ab418c..12cc446 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageLifecycleListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageLifecycleListener.java
@@ -33,7 +33,7 @@ public interface MetastorageLifecycleListener {
*
* @param metastorage Read-only meta storage.
*/
- public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException;
+ default void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException { };
/**
* Fully functional metastore capable of performing reading and writing operations.
@@ -43,5 +43,5 @@ public interface MetastorageLifecycleListener {
*
* @param metastorage Fully functional meta storage.
*/
- public void onReadyForReadWrite(ReadWriteMetastorage metastorage) throws IgniteCheckedException;
+ default void onReadyForReadWrite(ReadWriteMetastorage metastorage) throws IgniteCheckedException { };
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionRecoverState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionRecoverState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionRecoverState.java
new file mode 100644
index 0000000..4c7e4d7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionRecoverState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.partstate;
+
+/**
+ * Class holds state of partition during recovery process.
+ */
+public class PartitionRecoverState {
+ /** State id. */
+ private final int stateId;
+
+ /** Update counter. */
+ private final long updateCounter;
+
+ /**
+ * @param stateId State id.
+ * @param updateCounter Update counter.
+ */
+ public PartitionRecoverState(int stateId, long updateCounter) {
+ this.stateId = stateId;
+ this.updateCounter = updateCounter;
+ }
+
+ /**
+ *
+ */
+ public int stateId() {
+ return stateId;
+ }
+
+ /**
+ *
+ */
+ public long updateCounter() {
+ return updateCounter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
index 16cc8f5..75aa983 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
@@ -94,10 +94,6 @@ public class IgniteCacheSnapshotManager<T extends SnapshotOperation> extends Gri
/**
*
*/
- public void restoreState() throws IgniteCheckedException {
- // No-op.
- }
-
public boolean snapshotOperationInProgress(){
return false;
}