You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/11/18 11:53:46 UTC
[ignite] branch master updated: IGNITE-13558 Partitions restore process parallelization on node startup - Fixes #9327.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 35a6e5b IGNITE-13558 Partitions restore process parallelization on node startup - Fixes #9327.
35a6e5b is described below
commit 35a6e5b0c228e86b637bc2dcfdbfcd166836671f
Author: denis-chudov <mo...@gmail.com>
AuthorDate: Thu Nov 18 14:49:08 2021 +0300
IGNITE-13558 Partitions restore process parallelization on node startup - Fixes #9327.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../processors/cache/GridCacheProcessor.java | 162 +++++++++++++------
.../cache/IgniteCacheOffheapManager.java | 23 ++-
.../cache/IgniteCacheOffheapManagerImpl.java | 17 +-
.../GridCacheDatabaseSharedManager.java | 2 +-
.../cache/persistence/GridCacheOffheapManager.java | 173 +++++++++++----------
.../cache/RestorePartitionStateTest.java | 8 +-
6 files changed, 239 insertions(+), 146 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5ff1d3e..346ca69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -41,7 +41,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -5552,66 +5552,75 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (log.isInfoEnabled())
log.info("Restoring partition state for local groups.");
- AtomicLong totalProcessed = new AtomicLong();
-
AtomicReference<IgniteCheckedException> restoreStateError = new AtomicReference<>();
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
+ final int totalPart = forGroups.stream().mapToInt(grpCtx -> grpCtx.affinity().partitions()).sum();
- AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
+ CountDownLatch completionLatch = new CountDownLatch(totalPart);
- long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
+ Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new ConcurrentHashMap<>();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed = grp.offheap().restorePartitionStates(partStates);
+ final int topPartRefLimit = 5;
- totalProcessed.addAndGet(processed.size());
+ for (CacheGroupContext grpCtx : forGroups) {
+ for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+ final int partId = i;
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ sysPool.execute(() -> {
+ GroupPartitionId grpPartId = new GroupPartitionId(grpCtx.groupId(), partId);
- long ts = System.currentTimeMillis();
+ try {
+ long time = grpCtx.offheap().restoreStateOfPartition(partId, partStates.get(grpPartId));
- for (Map.Entry<Integer, Long> e : processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new GroupPartitionId(grp.groupId(), e.getKey())));
+ if (log.isInfoEnabled()) {
+ T3<Long, Long, GroupPartitionId> curPart = new T3<>(time, U.currentTimeMillis(), grpPartId);
- trimToSize(top, 5);
- }
+ RestorePartitionStateThreadContext threadCtx = threadCtxs.computeIfAbsent(
+ Thread.currentThread(),
+ t -> new RestorePartitionStateThreadContext()
+ );
+
+ Comparator<T3<Long, Long, GroupPartitionId>> cmp = processedPartitionComparator();
- topPartRef.updateAndGet(top0 -> {
- if (top0 == null)
- return top;
+ threadCtx.topPartRef.updateAndGet(prev -> {
+ if (prev == null ||
+ cmp.compare(prev.last(), curPart) < 0) {
+ SortedSet<T3<Long, Long, GroupPartitionId>> top = new TreeSet<>(cmp);
- for (T3<Long, Long, GroupPartitionId> t2 : top0) {
- top.add(t2);
+ top.add(curPart);
- trimToSize(top, 5);
- }
+ if (prev != null)
+ top.addAll(prev);
- return top;
- });
+ trimToSize(top, topPartRefLimit);
+
+ return top;
+ }
+ else
+ return prev;
+ });
+
+ threadCtx.incrementProcessedCnt();
+ }
}
- }
- catch (IgniteCheckedException | RuntimeException | Error e) {
- U.error(log, "Failed to restore partition state for " +
- "groupName=" + grp.name() + " groupId=" + grp.groupId(), e);
+ catch (IgniteCheckedException | RuntimeException | Error e) {
+ U.error(log, "Failed to restore partition state for " +
+ "groupName=" + grpCtx.name() + " groupId=" + grpCtx.groupId(), e);
- restoreStateError.compareAndSet(
- null,
- e instanceof IgniteCheckedException
+ IgniteCheckedException ex = e instanceof IgniteCheckedException
? ((IgniteCheckedException)e)
- : new IgniteCheckedException(e)
- );
- }
- finally {
- completionLatch.countDown();
- }
- });
+ : new IgniteCheckedException(e);
+
+ if (!restoreStateError.compareAndSet(null, ex))
+ restoreStateError.get().addSuppressed(ex);
+ }
+ finally {
+ completionLatch.countDown();
+ }
+ });
+ }
}
boolean printTop = false;
@@ -5625,15 +5634,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
while (!completionLatch.await(timeout, TimeUnit.MILLISECONDS)) {
if (log.isInfoEnabled()) {
- @Nullable SortedSet<T3<Long, Long, GroupPartitionId>> top = topPartRef.get();
+ SortedSet<T3<Long, Long, GroupPartitionId>> top =
+ collectTopProcessedParts(threadCtxs.values(), topPartRefLimit);
+
+ long totalProcessed = threadCtxs.values().stream().mapToLong(c -> c.processedCnt).sum();
log.info("Restore partitions state progress [grpCnt=" +
(forGroups.size() - completionLatch.getCount()) + '/' + forGroups.size() +
- ", partitionCnt=" + totalProcessed.get() + '/' + totalPart + (top == null ? "" :
+ ", partitionCnt=" + totalProcessed + '/' + totalPart + (top.isEmpty() ? "" :
", topProcessedPartitions=" + toStringTopProcessingPartitions(top, forGroups)) + ']');
}
timeout = TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS / 5;
+
printTop = true;
}
}
@@ -5642,16 +5655,22 @@ public class GridCacheProcessor extends GridProcessorAdapter {
throw new IgniteInterruptedException(e);
}
+ for (CacheGroupContext grpCtx : forGroups)
+ grpCtx.offheap().confirmPartitionStatesRestored();
+
// Checking error after all task applied.
if (restoreStateError.get() != null)
throw restoreStateError.get();
if (log.isInfoEnabled()) {
- SortedSet<T3<Long, Long, GroupPartitionId>> t = printTop ? topPartRef.get() : null;
+ SortedSet<T3<Long, Long, GroupPartitionId>> t =
+ printTop ? collectTopProcessedParts(threadCtxs.values(), topPartRefLimit) : null;
+
+ long totalProcessed = threadCtxs.values().stream().mapToLong(c -> c.processedCnt).sum();
log.info("Finished restoring partition state for local groups [" +
"groupsProcessed=" + forGroups.size() +
- ", partitionsProcessed=" + totalProcessed.get() +
+ ", partitionsProcessed=" + totalProcessed +
", time=" + U.humanReadableDuration(U.currentTimeMillis() - startRestorePart) +
(t == null ? "" : ", topProcessedPartitions=" + toStringTopProcessingPartitions(t, forGroups)) +
"]");
@@ -5659,6 +5678,30 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Collects top processed partitions from thread local contexts of restore partition state process.
+ *
+ * @param threadCtxs Thread local contexts.
+ * @param topPartRefLimit Limit of top partitions collection size.
+ */
+ private SortedSet<T3<Long, Long, GroupPartitionId>> collectTopProcessedParts(
+ Collection<RestorePartitionStateThreadContext> threadCtxs,
+ int topPartRefLimit
+ ) {
+ SortedSet<T3<Long, Long, GroupPartitionId>> top = new TreeSet<>(processedPartitionComparator());
+
+ for (RestorePartitionStateThreadContext threadCtx : threadCtxs) {
+ SortedSet<T3<Long, Long, GroupPartitionId>> threadTop = threadCtx.topPartRef.get();
+
+ if (threadTop != null)
+ top.addAll(threadTop);
+ }
+
+ trimToSize(top, topPartRefLimit);
+
+ return top;
+ }
+
+ /**
* Start warming up sequentially for each persist data region.
*
* @throws IgniteCheckedException If failed.
@@ -5972,13 +6015,36 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* Comparator of processed partitions.
* T3 -> 1 - duration, 2 - timestamp, 3 - partition of group.
- * Sort order: duration -> timestamp -> partition of group.
+ * Sort order: duration -> timestamp (reversed order) -> partition of group.
*
* @return Comparator.
*/
static Comparator<T3<Long, Long, GroupPartitionId>> processedPartitionComparator() {
Comparator<T3<Long, Long, GroupPartitionId>> comp = Comparator.comparing(T3::get1);
- return comp.thenComparing(T3::get2).thenComparing(T3::get3);
+ return comp.thenComparing(T3::get2, Comparator.reverseOrder()).thenComparing(T3::get3);
+ }
+
+ /**
+ * Thread local context of restore partition state progress.
+ */
+ private static class RestorePartitionStateThreadContext {
+ /** Field updater. */
+ static final AtomicLongFieldUpdater<RestorePartitionStateThreadContext> PROCESSED_CNT_UPD =
+ AtomicLongFieldUpdater.newUpdater(RestorePartitionStateThreadContext.class, "processedCnt");
+
+ /** Top partitions by processing time. */
+ final AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef =
+ new AtomicReference<>();
+
+ /** Processed partitions count. It is always updated from the same thread. */
+ volatile long processedCnt = 0;
+
+ /**
+ * Increment {@code processedCnt} field.
+ */
+ void incrementProcessedCnt() {
+ PROCESSED_CNT_UPD.incrementAndGet(this);
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index bc08c15..d206164 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
@@ -90,15 +89,27 @@ public interface IgniteCacheOffheapManager {
public void stop();
/**
+ * Pre-create single partition that resides in page memory or WAL and restores their state.
+ *
+ * @param p Partition id.
+ * @param recoveryState Partition recovery state.
+ * @return Processing time in millis.
+ * @throws IgniteCheckedException If failed.
+ */
+ long restoreStateOfPartition(int p, @Nullable Integer recoveryState) throws IgniteCheckedException;
+
+ /**
* Pre-create partitions that resides in page memory or WAL and restores their state.
*
- * @param partRecoveryStates Partition recovery states.
- * @return Processed partitions: partition id -> processing time in millis.
* @throws IgniteCheckedException If failed.
*/
- Map<Integer, Long> restorePartitionStates(
- Map<GroupPartitionId, Integer> partRecoveryStates
- ) throws IgniteCheckedException;
+ void restorePartitionStates() throws IgniteCheckedException;
+
+ /**
+ * Confirm that partition states are restored. This method should be called after restoring state of all partitions
+ * in group using {@link #restoreStateOfPartition(int, Integer)}.
+ */
+ void confirmPartitionStatesRestored();
/**
* @param entry Cache entry.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 8507b57..46ccede 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -68,7 +68,6 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
@@ -264,10 +263,18 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public Map<Integer, Long> restorePartitionStates(
- Map<GroupPartitionId, Integer> partRecoveryStates
- ) throws IgniteCheckedException {
- return Collections.emptyMap(); // No-op.
+ @Override public long restoreStateOfPartition(int p, @Nullable Integer recoveryState) throws IgniteCheckedException {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restorePartitionStates() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void confirmPartitionStatesRestored() {
+ // No-op.
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index a63fdf5..ed92638 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1434,7 +1434,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cctx.database().checkpointReadLock();
try {
- cacheGroup.offheap().restorePartitionStates(Collections.emptyMap());
+ cacheGroup.offheap().restorePartitionStates();
if (cacheGroup.localStartVersion().equals(fut.initialVersion()))
cacheGroup.topology().afterStateRestored(fut.initialVersion());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 2532ae5..1319ae1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -623,131 +623,140 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public Map<Integer, Long> restorePartitionStates(
- Map<GroupPartitionId, Integer> partRecoveryStates
- ) throws IgniteCheckedException {
+ @Override public long restoreStateOfPartition(int p, @Nullable Integer recoveryState) throws IgniteCheckedException {
if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled()
|| partitionStatesRestored)
- return Collections.emptyMap();
-
- Map<Integer, Long> processed = new HashMap<>();
+ return 0;
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
- for (int p = 0; p < grp.affinity().partitions(); p++) {
- Integer recoverState = partRecoveryStates.get(new GroupPartitionId(grp.groupId(), p));
-
- long startTime = U.currentTimeMillis();
+ long startTime = U.currentTimeMillis();
- if (log.isDebugEnabled())
- log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
-
- if (ctx.pageStore().exists(grp.groupId(), p)) {
- ctx.pageStore().ensure(grp.groupId(), p);
+ long res = 0;
- if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
- if (log.isDebugEnabled()) {
- log.debug("Skipping partition on recovery (pages less than 1) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
- }
+ if (log.isDebugEnabled())
+ log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
- continue;
- }
+ if (ctx.pageStore().exists(grp.groupId(), p)) {
+ ctx.pageStore().ensure(grp.groupId(), p);
+ if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
if (log.isDebugEnabled()) {
- log.debug("Creating partition on recovery (exists in page store) " +
+ log.debug("Skipping partition on recovery (pages less than or equals 1) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
}
- GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+ return 0;
+ }
- // Triggers initialization of existing(having datafile) partition before acquiring cp read lock.
- part.dataStore().init();
+ if (log.isDebugEnabled()) {
+ log.debug("Creating partition on recovery (exists in page store) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+ }
- ctx.database().checkpointReadLock();
+ GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
- try {
- long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
- long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
+ // Triggers initialization of existing(having datafile) partition before acquiring cp read lock.
+ part.dataStore().init();
- try {
- long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage);
+ ctx.database().checkpointReadLock();
- boolean changed = false;
+ try {
+ long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
+ long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
- try {
- PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+ try {
+ long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage);
- if (recoverState != null) {
- changed = io.setPartitionState(pageAddr, (byte)recoverState.intValue());
+ boolean changed = false;
- updateState(part, recoverState);
+ try {
+ PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
- if (log.isDebugEnabled()) {
- log.debug("Restored partition state (from WAL) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
- ", updCntr=" + part.initialUpdateCounter() +
- ", size=" + part.fullSize() + ']');
- }
- }
- else {
- int stateId = io.getPartitionState(pageAddr);
+ if (recoveryState != null) {
+ changed = io.setPartitionState(pageAddr, (byte)recoveryState.intValue());
- updateState(part, stateId);
+ updateState(part, recoveryState);
- if (log.isDebugEnabled()) {
- log.debug("Restored partition state (from page memory) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
- ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId +
- ", size=" + part.fullSize() + ']');
- }
+ if (log.isDebugEnabled()) {
+ log.debug("Restored partition state (from WAL) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+ ", updCntr=" + part.initialUpdateCounter() +
+ ", size=" + part.fullSize() + ']');
}
}
- finally {
- pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed);
+ else {
+ int stateId = io.getPartitionState(pageAddr);
+
+ updateState(part, stateId);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Restored partition state (from page memory) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+ ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId +
+ ", size=" + part.fullSize() + ']');
+ }
}
}
finally {
- pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
+ pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed);
}
}
finally {
- ctx.database().checkpointReadUnlock();
+ pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
}
-
- processed.put(p, U.currentTimeMillis() - startTime);
}
- else if (recoverState != null) { // Pre-create partition if having valid state.
- GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
- updateState(part, recoverState);
+ res = U.currentTimeMillis() - startTime;
+ }
+ else if (recoveryState != null) { // Pre-create partition if having valid state.
+ GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
- processed.put(p, U.currentTimeMillis() - startTime);
+ updateState(part, recoveryState);
- if (log.isDebugEnabled()) {
- log.debug("Restored partition state (from WAL) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
- ", updCntr=" + part.initialUpdateCounter() +
- ", size=" + part.fullSize() + ']');
- }
- }
- else {
- if (log.isDebugEnabled()) {
- log.debug("Skipping partition on recovery (no page store OR wal state) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
- }
- }
+ res = U.currentTimeMillis() - startTime;
if (log.isDebugEnabled()) {
- log.debug("Finished restoring partition state " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
- ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']');
+ log.debug("Restored partition state (from WAL) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+ ", updCntr=" + part.initialUpdateCounter() +
+ ", size=" + part.fullSize() + ']');
+ }
+ }
+ else {
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping partition on recovery (no page store OR wal state) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
}
}
- partitionStatesRestored = true;
+ if (log.isDebugEnabled()) {
+ log.debug("Finished restoring partition state " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
+ ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']');
+ }
+
+ return res;
+ }
- return processed;
+ /** {@inheritDoc} */
+ @Override public void restorePartitionStates() throws IgniteCheckedException {
+ if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled()
+ || partitionStatesRestored)
+ return;
+
+ for (int p = 0; p < grp.affinity().partitions(); p++)
+ restoreStateOfPartition(p, null);
+
+ confirmPartitionStatesRestored();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void confirmPartitionStatesRestored() {
+ partitionStatesRestored = true;
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java
index 319d885..eecd28d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java
@@ -145,12 +145,12 @@ public class RestorePartitionStateTest extends GridCommonAbstractTest {
@Test
public void testProcessedPartitionComparator() {
List<T3<Long, Long, GroupPartitionId>> exp = F.asList(
- new T3<>(0L, 0L, new GroupPartitionId(0, 0)),
+ new T3<>(0L, 2L, new GroupPartitionId(0, 0)),
new T3<>(0L, 1L, new GroupPartitionId(0, 0)),
new T3<>(1L, 1L, new GroupPartitionId(0, 0)),
- new T3<>(1L, 2L, new GroupPartitionId(0, 0)),
- new T3<>(1L, 2L, new GroupPartitionId(1, 0)),
- new T3<>(1L, 2L, new GroupPartitionId(1, 1))
+ new T3<>(1L, 0L, new GroupPartitionId(0, 0)),
+ new T3<>(1L, 0L, new GroupPartitionId(1, 0)),
+ new T3<>(1L, 0L, new GroupPartitionId(1, 1))
);
TreeSet<T3<Long, Long, GroupPartitionId>> act = new TreeSet<>(processedPartitionComparator());