You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/02/20 13:08:47 UTC
[ignite] branch master updated: IGNITE-13557 Logging improvements
for PDS memory restore process (#8751)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b3d760f IGNITE-13557 Logging improvements for PDS memory restore process (#8751)
b3d760f is described below
commit b3d760f8658d8ce4f905b54477690a60a588383c
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Sat Feb 20 16:08:18 2021 +0300
IGNITE-13557 Logging improvements for PDS memory restore process (#8751)
---
.../processors/cache/GridCacheProcessor.java | 189 +++++++++++++++---
.../cache/IgniteCacheOffheapManager.java | 8 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 6 +-
.../cache/persistence/GridCacheOffheapManager.java | 48 +++--
.../cache/RestorePartitionStateTest.java | 220 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite.java | 3 +
6 files changed, 427 insertions(+), 47 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d82ef97..67ee23f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
@@ -29,6 +30,10 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.StringJoiner;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -160,6 +165,7 @@ import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -192,6 +198,9 @@ import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
@@ -253,6 +262,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** MBean group for cache group metrics */
private static final String CACHE_GRP_METRICS_MBEAN_GRP = "Cache groups";
+ /**
+ * Initial timeout (in milliseconds) for output the progress of restoring partitions status.
+ * After the first output, the next ones will be output after value/5.
+ * It is recommended to change this property only in tests.
+ */
+ static long TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS = TimeUnit.MINUTES.toMillis(5);
+
/** Shared cache context. */
private GridCacheSharedContext<?, ?> sharedCtx;
@@ -784,7 +800,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
public List<GridCacheAdapter> blockGateways(Collection<Integer> cacheGrpIds) {
List<GridCacheAdapter> affectedCaches = internalCaches().stream()
.filter(cache -> cacheGrpIds.contains(cache.context().groupId()))
- .collect(Collectors.toList());
+ .collect(toList());
affectedCaches.forEach(cache -> {
// Add proxy if it's not initialized.
@@ -1702,7 +1718,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
List<StartCacheInfo> startCacheInfos = locJoinCtx.caches().stream()
.map(cacheInfo -> new StartCacheInfo(cacheInfo.get1(), cacheInfo.get2(), exchTopVer, false))
- .collect(Collectors.toList());
+ .collect(toList());
locJoinCtx.initCaches()
.forEach(cacheDesc -> {
@@ -1748,7 +1764,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
List<StartCacheInfo> startCacheInfos = receivedCaches.stream()
.filter(desc -> isLocalAffinity(desc.groupDescriptor().config()))
.map(desc -> new StartCacheInfo(desc, null, exchTopVer, false))
- .collect(Collectors.toList());
+ .collect(toList());
prepareStartCaches(startCacheInfos);
@@ -1876,7 +1892,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
List<StartCacheInfo> cacheInfosInOriginalOrder = startCacheInfos.stream()
.filter(successfullyPreparedCaches::contains)
- .collect(Collectors.toList());
+ .collect(toList());
for (StartCacheInfo startCacheInfo : cacheInfosInOriginalOrder) {
cacheStartFailHandler.handle(
@@ -2753,7 +2769,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
List<IgniteBiTuple<CacheGroupContext, Boolean>> grpsToStop = exchActions.cacheGroupsToStop().stream()
.filter(a -> cacheGrps.containsKey(a.descriptor().groupId()))
.map(a -> F.t(cacheGrps.get(a.descriptor().groupId()), a.destroy()))
- .collect(Collectors.toList());
+ .collect(toList());
// Wait until all evictions are finished.
grpsToStop.forEach(t -> sharedCtx.evict().onCacheGroupStopped(t.get1()));
@@ -2762,7 +2778,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
removeOffheapListenerAfterCheckpoint(grpsToStop);
Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop = exchActions.cacheStopRequests().stream()
- .collect(Collectors.groupingBy(action -> action.descriptor().groupId()));
+ .collect(groupingBy(action -> action.descriptor().groupId()));
try {
doInParallel(
@@ -3552,12 +3568,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cacheNameClo = () -> cfgs.stream()
.map(StoredCacheData::config)
.map(CacheConfiguration::getName)
- .collect(Collectors.toList()).toString();
+ .collect(toList()).toString();
cacheGrpNameClo = () -> cfgs.stream()
.map(StoredCacheData::config)
.map(CacheConfiguration::getGroupName)
- .collect(Collectors.toList()).toString();
+ .collect(toList()).toString();
}
}
@@ -3583,11 +3599,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
else {
cacheNameClo = () -> Stream.of(cfgs)
.map(CacheConfiguration::getName)
- .collect(Collectors.toList()).toString();
+ .collect(toList()).toString();
cacheGrpNameClo = () -> Stream.of(cfgs)
.map(CacheConfiguration::getGroupName)
- .collect(Collectors.toList()).toString();
+ .collect(toList()).toString();
}
}
@@ -3701,7 +3717,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
boolean disabledAfterStart
) {
return dynamicStartCachesByStoredConf(
- ccfgList.stream().map(StoredCacheData::new).collect(Collectors.toList()),
+ ccfgList.stream().map(StoredCacheData::new).collect(toList()),
failIfExists,
checkThreadTx,
disabledAfterStart,
@@ -3730,7 +3746,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
List<String> cacheNames = storedCacheDataList.stream()
.map(StoredCacheData::config)
.map(CacheConfiguration::getName)
- .collect(Collectors.toList());
+ .collect(toList());
return format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheNames, "dynamicStartCachesByStoredConf");
});
@@ -3885,7 +3901,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return dynamicChangeCaches(
cacheNames.stream().map(cacheName -> createStopRequest(cacheName, false, null, destroy))
- .collect(Collectors.toList())
+ .collect(toList())
);
}
@@ -4575,7 +4591,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return cachesInfo.registeredCaches().values()
.stream()
.filter(desc -> isPersistentCache(desc.cacheConfiguration(), ctx.config().getDataStorageConfiguration()))
- .collect(Collectors.toList());
+ .collect(toList());
}
/**
@@ -4585,7 +4601,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return cachesInfo.registeredCacheGroups().values()
.stream()
.filter(CacheGroupDescriptor::persistenceEnabled)
- .collect(Collectors.toList());
+ .collect(toList());
}
/**
@@ -5312,7 +5328,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return IntStream.range(0, pagesList.bucketsCount())
.mapToObj(bucket -> new CachePagesListView(pagesList, bucket, dataStore.partId()))
- .collect(Collectors.toList());
+ .collect(toList());
}, true, cacheDataStore -> partId == null || cacheDataStore.partId() == partId));
}
@@ -5452,13 +5468,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Restoring the state of partitions for cache groups.
+ *
* @param forGroups Cache groups.
- * @param partitionStates Partition states.
+ * @param partStates Partition states.
* @throws IgniteCheckedException If failed.
*/
private void restorePartitionStates(
Collection<CacheGroupContext> forGroups,
- Map<GroupPartitionId, Integer> partitionStates
+ Map<GroupPartitionId, Integer> partStates
) throws IgniteCheckedException {
long startRestorePart = U.currentTimeMillis();
@@ -5473,12 +5491,42 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
+ AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
+
+ long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
+
for (CacheGroupContext grp : forGroups) {
sysPool.execute(() -> {
try {
- long processed = grp.offheap().restorePartitionStates(partitionStates);
+ Map<Integer, Long> processed = grp.offheap().restorePartitionStates(partStates);
+
+ totalProcessed.addAndGet(processed.size());
+
+ if (log.isInfoEnabled()) {
+ TreeSet<T3<Long, Long, GroupPartitionId>> top =
+ new TreeSet<>(processedPartitionComparator());
+
+ long ts = System.currentTimeMillis();
+
+ for (Map.Entry<Integer, Long> e : processed.entrySet()) {
+ top.add(new T3<>(e.getValue(), ts, new GroupPartitionId(grp.groupId(), e.getKey())));
+
+ trimToSize(top, 5);
+ }
+
+ topPartRef.updateAndGet(top0 -> {
+ if (top0 == null)
+ return top;
+
+ for (T3<Long, Long, GroupPartitionId> t2 : top0) {
+ top.add(t2);
- totalProcessed.addAndGet(processed);
+ trimToSize(top, 5);
+ }
+
+ return top;
+ });
+ }
}
catch (IgniteCheckedException | RuntimeException | Error e) {
U.error(log, "Failed to restore partition state for " +
@@ -5497,9 +5545,29 @@ public class GridCacheProcessor extends GridProcessorAdapter {
});
}
+ boolean printTop = false;
+
try {
// Await completion restore state tasks in all stripes.
- completionLatch.await();
+ if (!log.isInfoEnabled())
+ completionLatch.await();
+ else {
+ long timeout = TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS;
+
+ while (!completionLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+ if (log.isInfoEnabled()) {
+ @Nullable SortedSet<T3<Long, Long, GroupPartitionId>> top = topPartRef.get();
+
+ log.info("Restore partitions state progress [grpCnt=" +
+ (forGroups.size() - completionLatch.getCount()) + '/' + forGroups.size() +
+ ", partitionCnt=" + totalProcessed.get() + '/' + totalPart + (top == null ? "" :
+ ", topProcessedPartitions=" + toStringTopProcessingPartitions(top, forGroups)) + ']');
+ }
+
+ timeout = TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS / 5;
+ printTop = true;
+ }
+ }
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
@@ -5509,11 +5577,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (restoreStateError.get() != null)
throw restoreStateError.get();
- if (log.isInfoEnabled())
+ if (log.isInfoEnabled()) {
+ SortedSet<T3<Long, Long, GroupPartitionId>> t = printTop ? topPartRef.get() : null;
+
log.info("Finished restoring partition state for local groups [" +
"groupsProcessed=" + forGroups.size() +
", partitionsProcessed=" + totalProcessed.get() +
- ", time=" + (U.currentTimeMillis() - startRestorePart) + "ms]");
+ ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startRestorePart) +
+ (t == null ? "" : ", topProcessedPartitions=" + toStringTopProcessingPartitions(t, forGroups)) +
+ "]");
+ }
}
/**
@@ -5769,4 +5842,74 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DELEGATE.removeNode(nodeId);
}
}
+
+ /**
+ * Creation of a string representation of the top (descending) partitions, the processing of which took the most time.
+ *
+ * @param top Top (ascending) processed partitions.
+ * @param groups Cache group contexts.
+ * @return String representation.
+ */
+ static String toStringTopProcessingPartitions(
+ SortedSet<T3<Long, Long, GroupPartitionId>> top,
+ Collection<CacheGroupContext> groups
+ ) {
+ if (top.isEmpty())
+ return "[]";
+
+ StringJoiner sj0 = new StringJoiner(", ", "[", "]");
+
+ TreeMap<Long, List<GroupPartitionId>> top0 = top.stream().collect(groupingBy(
+ T3::get1,
+ TreeMap::new,
+ mapping(T3::get3, toList())
+ ));
+
+ for (Map.Entry<Long, List<GroupPartitionId>> e0 : top0.descendingMap().entrySet()) {
+ Map<Integer, List<GroupPartitionId>> byCacheGrpId =
+ e0.getValue().stream().collect(groupingBy(GroupPartitionId::getGroupId));
+
+ StringJoiner sj1 = new StringJoiner(", ", "[", "]");
+
+ for (Map.Entry<Integer, List<GroupPartitionId>> e1 : byCacheGrpId.entrySet()) {
+ @Nullable CacheGroupContext grp =
+ groups.stream().filter(g -> g.groupId() == e1.getKey()).findAny().orElse(null);
+
+ String parts = e1.getValue().stream().map(GroupPartitionId::getPartitionId).sorted()
+ .map(p -> grp == null ? p.toString() : p + ":" + grp.topology().localPartition(p).fullSize())
+ .collect(Collectors.joining(", ", "[", "]"));
+
+ sj1.add("[grp=" + (grp == null ? e1.getKey() : grp.cacheOrGroupName()) + ", part=" + parts + ']');
+ }
+
+ sj0.add("[time=" + U.humanReadableDuration(e0.getKey()) + ' ' + sj1.toString() + ']');
+ }
+
+ return sj0.toString();
+ }
+
+ /**
+ * Trimming the set to the required size.
+ * Removing items will be in ascending order.
+ *
+ * @param set Set.
+ * @param size Size.
+ */
+ static <E> void trimToSize(SortedSet<E> set, int size) {
+ while (set.size() > size)
+ set.remove(set.first());
+ }
+
+ /**
+ * Comparator of processed partitions.
+ * T3 -> 1 - duration, 2 - timestamp, 3 - partition of group.
+ * Sort order: duration -> timestamp -> partition of group.
+ *
+ * @return Comparator.
+ */
+ static Comparator<T3<Long, Long, GroupPartitionId>> processedPartitionComparator() {
+ Comparator<T3<Long, Long, GroupPartitionId>> comp = Comparator.comparing(T3::get1);
+
+ return comp.thenComparing(T3::get2).thenComparing(T3::get3);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index ceebc72..61e1605 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -92,11 +92,13 @@ public interface IgniteCacheOffheapManager {
/**
* Pre-create partitions that resides in page memory or WAL and restores their state.
*
- * @param partitionRecoveryStates Partition recovery states.
- * @return Number of processed partitions.
+ * @param partRecoveryStates Partition recovery states.
+ * @return Processed partitions: partition id -> processing time in millis.
* @throws IgniteCheckedException If failed.
*/
- long restorePartitionStates(Map<GroupPartitionId, Integer> partitionRecoveryStates) throws IgniteCheckedException;
+ Map<Integer, Long> restorePartitionStates(
+ Map<GroupPartitionId, Integer> partRecoveryStates
+ ) throws IgniteCheckedException;
/**
* Partition counter update callback. May be overridden by plugin-provided subclasses.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 267430a..32d59aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -271,8 +271,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public long restorePartitionStates(Map<GroupPartitionId, Integer> partitionRecoveryStates) throws IgniteCheckedException {
- return 0; // No-op.
+ @Override public Map<Integer, Long> restorePartitionStates(
+ Map<GroupPartitionId, Integer> partRecoveryStates
+ ) throws IgniteCheckedException {
+ return Collections.emptyMap(); // No-op.
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 376b14b..0e85ea3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -609,38 +609,41 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public long restorePartitionStates(Map<GroupPartitionId, Integer> partitionRecoveryStates) throws IgniteCheckedException {
- if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled())
- return 0;
-
- if (partitionStatesRestored)
- return 0;
+ @Override public Map<Integer, Long> restorePartitionStates(
+ Map<GroupPartitionId, Integer> partRecoveryStates
+ ) throws IgniteCheckedException {
+ if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled()
+ || partitionStatesRestored)
+ return Collections.emptyMap();
- long processed = 0;
+ Map<Integer, Long> processed = new HashMap<>();
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
for (int p = 0; p < grp.affinity().partitions(); p++) {
- Integer recoverState = partitionRecoveryStates.get(new GroupPartitionId(grp.groupId(), p));
+ Integer recoverState = partRecoveryStates.get(new GroupPartitionId(grp.groupId(), p));
long startTime = U.currentTimeMillis();
+ if (log.isDebugEnabled())
+ log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+
if (ctx.pageStore().exists(grp.groupId(), p)) {
ctx.pageStore().ensure(grp.groupId(), p);
if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Skipping partition on recovery (pages less than 1) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+ }
continue;
}
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Creating partition on recovery (exists in page store) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
-
- processed++;
+ }
GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
@@ -666,22 +669,24 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
updateState(part, recoverState);
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Restored partition state (from WAL) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
", updCntr=" + part.initialUpdateCounter() +
", size=" + part.fullSize() + ']');
+ }
}
else {
int stateId = io.getPartitionState(pageAddr);
updateState(part, stateId);
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Restored partition state (from page memory) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId +
", size=" + part.fullSize() + ']');
+ }
}
}
finally {
@@ -695,30 +700,35 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
finally {
ctx.database().checkpointReadUnlock();
}
+
+ processed.put(p, U.currentTimeMillis() - startTime);
}
else if (recoverState != null) { // Pre-create partition if having valid state.
GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
updateState(part, recoverState);
- processed++;
+ processed.put(p, U.currentTimeMillis() - startTime);
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Restored partition state (from WAL) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
", updCntr=" + part.initialUpdateCounter() +
", size=" + part.fullSize() + ']');
+ }
}
else {
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Skipping partition on recovery (no page store OR wal state) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+ }
}
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Finished restoring partition state " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p +
- ", time=" + (U.currentTimeMillis() - startTime) + " ms]");
+ ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']');
+ }
}
partitionStatesRestored = true;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java
new file mode 100644
index 0000000..319d885
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.function.UnaryOperator;
+import java.util.stream.LongStream;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheProcessor.TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS;
+import static org.apache.ignite.internal.processors.cache.GridCacheProcessor.processedPartitionComparator;
+import static org.apache.ignite.internal.processors.cache.GridCacheProcessor.toStringTopProcessingPartitions;
+import static org.apache.ignite.internal.processors.cache.GridCacheProcessor.trimToSize;
+
+/**
+ * Class for testing the restoration of the status of partitions.
+ */
+public class RestorePartitionStateTest extends GridCommonAbstractTest {
+ /** Timeout for displaying the progress of restoring the status of partitions. */
+ private long timeoutOutputRestoreProgress;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+ cleanPersistenceDir();
+
+ timeoutOutputRestoreProgress = TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ cleanPersistenceDir();
+
+ TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS = timeoutOutputRestoreProgress;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setCacheConfiguration(
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME + "0")
+ .setAffinity(new RendezvousAffinityFunction(false, 32)),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME + "1")
+ .setAffinity(new RendezvousAffinityFunction(false, 32)),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME + "3")
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ )
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
+ );
+ }
+
+ /**
+ * Checking the correctness of the string representation of the top.
+ */
+ @Test
+ public void testTopRestorePartitionsToString() {
+ TreeSet<T3<Long, Long, GroupPartitionId>> s = new TreeSet<>(processedPartitionComparator());
+
+ s.add(new T3<>(10L, 0L, new GroupPartitionId(0, 0)));
+ s.add(new T3<>(10L, 0L, new GroupPartitionId(0, 1)));
+ s.add(new T3<>(10L, 0L, new GroupPartitionId(1, 1)));
+
+ s.add(new T3<>(20L, 0L, new GroupPartitionId(2, 0)));
+ s.add(new T3<>(20L, 0L, new GroupPartitionId(2, 1)));
+
+ String exp = "[[time=20ms [[grp=2, part=[0, 1]]]], [time=10ms [[grp=0, part=[0, 1]], [grp=1, part=[1]]]]]";
+
+ assertEquals(exp, toStringTopProcessingPartitions(s, Collections.emptyList()));
+ }
+
+ /**
+ * Testing Method {@link GridCacheProcessor#trimToSize}.
+ */
+ @Test
+ public void testTrimToSize() {
+ TreeSet<Long> act = new TreeSet<>();
+
+ trimToSize(act, 0);
+ trimToSize(act, 1);
+
+ LongStream.range(0, 10).forEach(act::add);
+
+ trimToSize(act, act.size());
+ assertEquals(10, act.size());
+
+ TreeSet<Long> exp = new TreeSet<>(act);
+
+ trimToSize(act, 9);
+ assertEqualsCollections(exp.tailSet(1L), act);
+
+ trimToSize(act, 5);
+ assertEqualsCollections(exp.tailSet(5L), act);
+
+ trimToSize(act, 3);
+ assertEqualsCollections(exp.tailSet(7L), act);
+
+ trimToSize(act, 0);
+ assertEqualsCollections(exp.tailSet(10L), act);
+ }
+
+ /**
+ * Testing Method {@link GridCacheProcessor#processedPartitionComparator}.
+ */
+ @Test
+ public void testProcessedPartitionComparator() {
+ List<T3<Long, Long, GroupPartitionId>> exp = F.asList(
+ new T3<>(0L, 0L, new GroupPartitionId(0, 0)),
+ new T3<>(0L, 1L, new GroupPartitionId(0, 0)),
+ new T3<>(1L, 1L, new GroupPartitionId(0, 0)),
+ new T3<>(1L, 2L, new GroupPartitionId(0, 0)),
+ new T3<>(1L, 2L, new GroupPartitionId(1, 0)),
+ new T3<>(1L, 2L, new GroupPartitionId(1, 1))
+ );
+
+ TreeSet<T3<Long, Long, GroupPartitionId>> act = new TreeSet<>(processedPartitionComparator());
+ act.addAll(exp);
+
+ assertEqualsCollections(exp, act);
+ }
+
+ /**
+ * Check that the progress of restoring partitions with the top partitions is displayed in the log.
+ *
+ * @throws Exception If fail.
+ */
+ @Test
+ public void testLogTopPartitions() throws Exception {
+ IgniteEx n = startGrid(0);
+
+ n.cluster().state(ClusterState.ACTIVE);
+ awaitPartitionMapExchange();
+
+ ((GridCacheDatabaseSharedManager)n.context().cache().context().database())
+ .enableCheckpoints(false).get(getTestTimeout());
+
+ for (IgniteInternalCache cache : n.context().cache().caches()) {
+ for (int i = 0; i < 1_000; i++)
+ cache.put(i, cache.name() + i);
+ }
+
+ stopAllGrids();
+ awaitPartitionMapExchange();
+
+ LogListener startPartRestore = LogListener.matches(logStr -> {
+ if (logStr.startsWith("Started restoring partition state [grp=")) {
+ try {
+ U.sleep(15);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ return true;
+ }
+
+ return false;
+ }).build();
+
+ LogListener progressPartRestore = LogListener.matches("Restore partitions state progress")
+ .andMatches("topProcessedPartitions").build();
+
+ LogListener finishPartRestore = LogListener.matches("Finished restoring partition state for local groups")
+ .andMatches("topProcessedPartitions").build();
+
+ TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS = 150L;
+
+ setRootLoggerDebugLevel();
+
+ startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> {
+ cfg.setGridLogger(new ListeningTestLogger(log, startPartRestore, progressPartRestore, finishPartRestore));
+
+ return cfg;
+ });
+
+ assertTrue(startPartRestore.check());
+ assertTrue(progressPartRestore.check());
+ assertTrue(finishPartRestore.check());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 75aaed9..6034074 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.AutoActivationPropertyTest;
import org.apache.ignite.internal.processors.cache.ClusterStateOnStartPropertyTest;
import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistence;
import org.apache.ignite.internal.processors.cache.IgnitePdsDataRegionMetricsTxTest;
+import org.apache.ignite.internal.processors.cache.RestorePartitionStateTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheConfigurationFileConsistencyCheckTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheObjectBinaryProcessorOnDiscoveryTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsClientNearCachePutGetTest;
@@ -140,6 +141,8 @@ public class IgnitePdsTestSuite {
GridTestUtils.addTestIfNeeded(suite, WalArchiveConsistencyTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, RestorePartitionStateTest.class, ignoredTests);
+
return suite;
}