You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/07/07 06:47:52 UTC
[ignite] branch master updated: IGNITE-12935 Improved logging for
historical rebalancing. - Fixes #7722.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f55901d IGNITE-12935 Improved logging for historical rebalancing. - Fixes #7722.
f55901d is described below
commit f55901d3de2148227b102e5f5260bc637617f261
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Jul 7 09:41:32 2020 +0300
IGNITE-12935 Improved logging for historical rebalancing. - Fixes #7722.
Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
.../preloader/GridDhtPartitionsExchangeFuture.java | 91 ++++++-
.../dht/preloader/SupplyPartitionInfo.java | 99 ++++++++
.../GridCacheDatabaseSharedManager.java | 65 ++++-
.../persistence/checkpoint/CheckpointHistory.java | 29 ++-
.../persistence/checkpoint/ReservationReason.java | 50 ++++
.../db/wal/IgniteWalRebalanceLoggingTest.java | 275 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsMvccTestSuite2.java | 2 +
.../ignite/testsuites/IgnitePdsTestSuite2.java | 3 +
.../processors/client/IgniteDataStreamerTest.java | 2 +
9 files changed, 598 insertions(+), 18 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a34a098..21b3d26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3349,8 +3349,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
*
* @param top Topology to assign.
* @param resetOwners True if need to reset partition state considering of counter, false otherwise.
+ * @return Partitions supply info list.
*/
- private void assignPartitionStates(GridDhtPartitionTopology top, boolean resetOwners) {
+ private List<SupplyPartitionInfo> assignPartitionStates(GridDhtPartitionTopology top, boolean resetOwners) {
Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
Map<Integer, TreeSet<Long>> varCntrs = new HashMap<>();
@@ -3424,10 +3425,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Set<Integer> haveHistory = new HashSet<>();
- assignHistoricalSuppliers(top, maxCntrs, varCntrs, haveHistory);
+ List<SupplyPartitionInfo> list = assignHistoricalSuppliers(top, maxCntrs, varCntrs, haveHistory);
if (resetOwners)
resetOwnersByCounter(top, maxCntrs, haveHistory);
+
+ return list;
}
/**
@@ -3470,8 +3473,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param maxCntrs Max counter partiton map.
* @param varCntrs Various counters for each partition.
* @param haveHistory Set of partitions witch have historical supplier.
+ * @return List of partitions which does not have historical supplier.
*/
- private void assignHistoricalSuppliers(
+ private List<SupplyPartitionInfo> assignHistoricalSuppliers(
GridDhtPartitionTopology top,
Map<Integer, CounterWithNodes> maxCntrs,
Map<Integer, TreeSet<Long>> varCntrs,
@@ -3481,6 +3485,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.groupId()) : null;
+ List<SupplyPartitionInfo> list = new ArrayList<>();
+
for (Map.Entry<Integer, TreeSet<Long>> e : varCntrs.entrySet()) {
int p = e.getKey();
@@ -3529,7 +3535,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
deepestReserved.set(e0.getKey(), histCntr);
}
}
+
+ // No one reservation matched for this partition.
+ if (!haveHistory.contains(p)) {
+ list.add(new SupplyPartitionInfo(
+ p,
+ nonMaxCntrs.last(),
+ deepestReserved.get2(),
+ deepestReserved.get1()
+ ));
+ }
}
+
+ return list;
}
/**
@@ -4124,6 +4142,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param resetOwners True if reset partitions state needed, false otherwise.
*/
private void assignPartitionsStates(boolean resetOwners) {
+ Map<String, List<SupplyPartitionInfo>> supplyInfoMap = log.isInfoEnabled() ?
+ new ConcurrentHashMap<>() : null;
+
try {
U.doInParallel(
cctx.kernalContext().getSystemExecutorService(),
@@ -4135,8 +4156,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
? grpCtx.topology()
: cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
- if (CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration()))
- assignPartitionStates(top, resetOwners);
+ if (CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration())) {
+ List<SupplyPartitionInfo> list = assignPartitionStates(top, resetOwners);
+
+ if (supplyInfoMap != null && !F.isEmpty(list))
+ supplyInfoMap.put(grpDesc.cacheOrGroupName(), list);
+ }
else if (resetOwners)
assignPartitionSizes(top);
@@ -4148,10 +4173,66 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
throw new IgniteException("Failed to assign partition states", e);
}
+ if (!F.isEmpty(supplyInfoMap))
+ printPartitionRebalancingFully(supplyInfoMap);
+
timeBag.finishGlobalStage("Assign partitions states");
}
/**
+ * Prints detail information about partitions which did not have reservation
+ * history enough for historical rebalance.
+ *
+ * @param supplyInfoMap Map contains information about supplying partitions.
+ */
+ private void printPartitionRebalancingFully(Map<String, List<SupplyPartitionInfo>> supplyInfoMap) {
+ try {
+ if (hasPartitionToLog(supplyInfoMap, false)) {
+ log.info("Partitions weren't present in any history reservation: [" +
+ supplyInfoMap.entrySet().stream().map(entry ->
+ "[grp=" + entry.getKey() + " part=[" + S.compact(entry.getValue().stream()
+ .filter(info -> !info.isHistoryReserved())
+ .map(info -> info.part()).collect(Collectors.toSet())) + "]]"
+ ).collect(Collectors.joining(", ")) + ']');
+ }
+
+ if (hasPartitionToLog(supplyInfoMap, true)) {
+ log.info("Partitions were reserved, but maximum available counter is greater than demanded: [" +
+ supplyInfoMap.entrySet().stream().map(entry ->
+ "[grp=" + entry.getKey() + ' ' +
+ entry.getValue().stream().filter(SupplyPartitionInfo::isHistoryReserved).map(info ->
+ "[part=" + info.part() +
+ ", minCntr=" + info.minCntr() +
+ ", maxReserved=" + info.maxReserved() +
+ ", maxReservedNodeId=" + info.maxReservedNodeId() + ']'
+ ).collect(Collectors.joining(", ")) + ']'
+ ).collect(Collectors.joining(", ")) + ']');
+ }
+ }
+ catch (Exception e) {
+ log.error("An error happened during printing partitions that have no history.", e);
+ }
+ }
+
+ /**
+ * Does information contain partitions which will print to log.
+ *
+ * @param supplyInfoMap Map contains information about supplying partitions.
+ * @param reserved Reservation flag.
+ * @return True if map has partitions with same reserved flag, false otherwise.
+ */
+ private boolean hasPartitionToLog(Map<String, List<SupplyPartitionInfo>> supplyInfoMap, boolean reserved) {
+ for (List<SupplyPartitionInfo> infos : supplyInfoMap.values()) {
+ for (SupplyPartitionInfo info : infos) {
+ if (info.isHistoryReserved() == reserved)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
* Removes gaps in the local update counters. Gaps in update counters are possible on backup node when primary
* failed to send update counter deltas to backup.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/SupplyPartitionInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/SupplyPartitionInfo.java
new file mode 100644
index 0000000..11eca52
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/SupplyPartitionInfo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Information about supplier for specific partition.
+ */
+public class SupplyPartitionInfo {
+ /**
+ * Number of partiiton.
+ */
+ private int part;
+
+ /**
+ * Counter of partiotn from demander node.
+ */
+ private long minCntr;
+
+ /**
+ * Reservation.
+ */
+ private long maxReserved;
+
+ /**
+ * Node with the moust deepest history by partiton.
+ */
+ private UUID maxReservedNodeId;
+
+ /**
+ * @param part Partiiotn.
+ * @param minCntr Minimal counter.
+ * @param maxReserved Max reservation.
+ * @param maxReservedNodeId Node with maximum reservation.
+ */
+ public SupplyPartitionInfo(int part, long minCntr, long maxReserved, UUID maxReservedNodeId) {
+ this.part = part;
+ this.minCntr = minCntr;
+ this.maxReserved = maxReserved;
+ this.maxReservedNodeId = maxReservedNodeId;
+ }
+
+ /**
+ * @return Partition.
+ */
+ public int part() {
+ return part;
+ }
+
+ /**
+ * @return Minimum counter.
+ */
+ public long minCntr() {
+ return minCntr;
+ }
+
+ /**
+ * @return Max reservation.
+ */
+ public long maxReserved() {
+ return maxReserved;
+ }
+
+ /**
+ * @return Node id.
+ */
+ public UUID maxReservedNodeId() {
+ return maxReservedNodeId;
+ }
+
+ /**
+ * @return True if reserved, false otherwise.
+ */
+ public boolean isHistoryReserved() {
+ return maxReserved != Long.MAX_VALUE && maxReservedNodeId != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SupplyPartitionInfo.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 4b01715..75af035 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -135,6 +135,7 @@ import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkp
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgressImpl;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.PartitionDestroyQueue;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.PartitionDestroyRequest;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.ReservationReason;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
@@ -1737,7 +1738,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
Map</*grpId*/Integer, Set</*partId*/Integer>> applicableGroupsAndPartitions = partitionsApplicableForWalRebalance();
- Map</*grpId*/Integer, Map</*partId*/Integer, CheckpointEntry>> earliestValidCheckpoints;
+ Map</*grpId*/Integer, T2</*reason*/ReservationReason, Map</*partId*/Integer, CheckpointEntry>>> earliestValidCheckpoints;
checkpointReadLock();
@@ -1750,10 +1751,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
Map</*grpId*/Integer, Map</*partId*/Integer, /*updCntr*/Long>> grpPartsWithCnts = new HashMap<>();
- for (Map.Entry<Integer, Map<Integer, CheckpointEntry>> e : earliestValidCheckpoints.entrySet()) {
+ for (Map.Entry<Integer, T2</*reason*/ReservationReason, Map</*partId*/Integer, CheckpointEntry>>> e : earliestValidCheckpoints.entrySet()) {
int grpId = e.getKey();
- for (Map.Entry<Integer, CheckpointEntry> e0 : e.getValue().entrySet()) {
+ if (e.getValue().get2() == null)
+ continue;
+
+ for (Map.Entry<Integer, CheckpointEntry> e0 : e.getValue().get2().entrySet()) {
CheckpointEntry cpEntry = e0.getValue();
int partId = e0.getKey();
@@ -1772,10 +1776,65 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
+ if (log.isInfoEnabled() && !F.isEmpty(earliestValidCheckpoints))
+ printReservationToLog(earliestValidCheckpoints);
+
return grpPartsWithCnts;
}
/**
+ * Prints detail information about caches which were not reserved
+ * and reservation depth for the caches which have WAL history enough.
+ *
+ * @param earliestValidCheckpoints Map contains information about caches' reservation.
+ */
+ private void printReservationToLog(
+ Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> earliestValidCheckpoints) {
+ try {
+ Map<ReservationReason, List<Integer>> notReservedCachesToPrint = new HashMap<>();
+ Map<ReservationReason, List<T2<Integer, CheckpointEntry>>> reservedCachesToPrint = new HashMap<>();
+
+ for (Map.Entry<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> entry : earliestValidCheckpoints.entrySet()) {
+ if (entry.getValue().get2() == null) {
+ notReservedCachesToPrint.computeIfAbsent(entry.getValue().get1(), reason -> new ArrayList<>())
+ .add(entry.getKey());
+ }
+ else {
+ reservedCachesToPrint.computeIfAbsent(entry.getValue().get1(), reason -> new ArrayList<>())
+ .add(new T2(entry.getKey(), entry.getValue().get2().values().stream().min(
+ Comparator.comparingLong(CheckpointEntry::timestamp)).get()));
+ }
+ }
+
+ if (!F.isEmpty(notReservedCachesToPrint)) {
+ log.info("Cache groups were not reserved [" +
+ notReservedCachesToPrint.entrySet().stream()
+ .map(entry -> '[' +
+ entry.getValue().stream().map(grpId -> "[grpId=" + grpId +
+ ", grpName=" + cctx.cache().cacheGroup(grpId).cacheOrGroupName() + ']')
+ .collect(Collectors.joining(", ")) +
+ ", reason=" + entry.getKey() + ']')
+ .collect(Collectors.joining(", ")) + ']');
+ }
+
+ if (!F.isEmpty(reservedCachesToPrint)) {
+ log.info("Cache groups with earliest reserved checkpoint and a reason why a previous checkpoint was inapplicable: [" +
+ reservedCachesToPrint.entrySet().stream()
+ .map(entry -> '[' +
+ entry.getValue().stream().map(grpCp -> "[grpId=" + grpCp.get1() +
+ ", grpName=" + cctx.cache().cacheGroup(grpCp.get1()).cacheOrGroupName() +
+ ", cp=(" + grpCp.get2().checkpointId() + ", " + U.format(grpCp.get2().timestamp()) + ")]")
+ .collect(Collectors.joining(", ")) +
+ ", reason=" + entry.getKey() + ']')
+ .collect(Collectors.joining(", ")) + ']');
+ }
+ }
+ catch (Exception e) {
+ log.error("An error happened during printing partitions that were reserved for potential historical rebalance.", e);
+ }
+ }
+
+ /**
* @return Map of group id -> Set of partitions which can be used as suppliers for WAL rebalance.
*/
private Map<Integer, Set<Integer>> partitionsApplicableForWalRebalance() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
index 131e379..dfe3798 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -550,19 +550,22 @@ public class CheckpointHistory {
*
* @param groupsAndPartitions Groups and partitions to find and reserve earliest valid checkpoint.
*
- * @return Map (groupId: Map (partitionId, earliest valid checkpoint to history search)).
+ * @return Map (groupId, Reason (the reason why reservation cannot be made deeper): Map
+ * (partitionId, earliest valid checkpoint to history search)).
*/
- public Map<Integer, Map<Integer, CheckpointEntry>> searchAndReserveCheckpoints(
+ public Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> searchAndReserveCheckpoints(
final Map<Integer, Set<Integer>> groupsAndPartitions
) {
if (F.isEmpty(groupsAndPartitions))
return Collections.emptyMap();
- final Map<Integer, Map<Integer, CheckpointEntry>> res = new HashMap<>();
+ final Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> res = new HashMap<>();
CheckpointEntry oldestCpForReservation = null;
synchronized (earliestCp) {
+ CheckpointEntry oldestHistoryCpEntry = firstCheckpoint();
+
for (Integer grpId : groupsAndPartitions.keySet()) {
CheckpointEntry oldestGrpCpEntry = null;
@@ -579,17 +582,23 @@ public class CheckpointHistory {
oldestGrpCpEntry = cpEntry;
res.computeIfAbsent(grpId, partCpMap ->
- new HashMap<>())
- .put(part, cpEntry);
+ new T2<>(ReservationReason.NO_MORE_HISTORY, new HashMap<>()))
+ .get2().put(part, cpEntry);
}
+
+ if (oldestGrpCpEntry == null || oldestGrpCpEntry != oldestHistoryCpEntry)
+ res.computeIfAbsent(grpId, (partCpMap) ->
+ new T2<>(ReservationReason.CHECKPOINT_NOT_APPLICABLE, null))
+ .set1(ReservationReason.CHECKPOINT_NOT_APPLICABLE);
}
+ }
- if (oldestCpForReservation != null) {
- if (!cctx.wal().reserve(oldestCpForReservation.checkpointMark())) {
- log.warning("Could not reserve cp " + oldestCpForReservation.checkpointMark());
+ if (oldestCpForReservation != null) {
+ if (!cctx.wal().reserve(oldestCpForReservation.checkpointMark())) {
+ log.warning("Could not reserve cp " + oldestCpForReservation.checkpointMark());
- return Collections.emptyMap();
- }
+ for (Map.Entry<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> entry : res.entrySet())
+ entry.setValue(new T2<>(ReservationReason.WAL_RESERVATION_ERROR, null));
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/ReservationReason.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/ReservationReason.java
new file mode 100644
index 0000000..5f12e45
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/ReservationReason.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
+
+/**
+ * Represent a reason by that a WAL history was bounded.
+ */
+public enum ReservationReason {
+ /** The message puts down to log when an exception happened during
+ * reading a WAL segment or a segment cannot be reserved. */
+ WAL_RESERVATION_ERROR,
+
+ /** Reason means no more history reserved for the cache. */
+ NO_MORE_HISTORY,
+
+ /** Reason means a checkpoint in history reserved can not be applied for cache. */
+ CHECKPOINT_NOT_APPLICABLE;
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ switch (this) {
+ case WAL_RESERVATION_ERROR:
+ return "Unexpected error during processing of previous checkpoint";
+
+ case NO_MORE_HISTORY:
+ return "Reserved checkpoint is the oldest in history";
+
+ case CHECKPOINT_NOT_APPLICABLE:
+ return "Checkpoint was marked as inapplicable for historical rebalancing";
+
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java
new file mode 100644
index 0000000..9b21b49
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.internal.pagemem.wal.record.RolloverType.CURRENT_SEGMENT;
+
+/**
+ * Tests for checking historical rebalance log messages.
+ */
+public class IgniteWalRebalanceLoggingTest extends GridCommonAbstractTest {
+ /** This timeout should be big enough in order to prohibit checkpoint triggered by timeout. */
+ private static final int CHECKPOINT_FREQUENCY = 600_000;
+
+ /** Test logger. */
+ private final ListeningTestLogger srvLog = new ListeningTestLogger(false, log);
+
+ /** */
+ private static final int KEYS_LOW_BORDER = 100;
+
+ /** */
+ private static final int KEYS_UPPER_BORDER = 200;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setGridLogger(srvLog);
+
+ DataStorageConfiguration storageCfg = new DataStorageConfiguration();
+
+ storageCfg.getDefaultDataRegionConfiguration()
+ .setMaxSize(200L * 1024 * 1024)
+ .setPersistenceEnabled(true);
+
+ storageCfg.setWalMode(WALMode.LOG_ONLY)
+ .setMaxWalArchiveSize(-1)
+ .setWalCompactionEnabled(true)
+ .setWalCompactionLevel(1);
+
+ storageCfg.setCheckpointFrequency(CHECKPOINT_FREQUENCY);
+
+ cfg.setDataStorageConfiguration(storageCfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc}*/
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc}*/
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * Check that in case of Historical rebalance we log appropriate messages.
+ * <p>
+ * <b>Steps:</b>
+ * <ol>
+ * <li>set IGNITE_PDS_WAL_REBALANCE_THRESHOLD to 1</li>
+ * <li>Start two nodes.</li>
+ * <li>Create two caches each in it's own cache group and populate them with some data.</li>
+ * <li>Stop second node and add more data to both caches.</li>
+ * <li>Wait checkpoint frequency * 2. This is required to guarantee that at least one checkpoint would be
+ * created.</li>
+ * <li>Start, previously stopped node and await for PME.</li>
+ * </ol>
+ * <p>
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "1")
+ public void testHistoricalRebalanceLogMsg() throws Exception {
+ LogListener expMsgsLsnr = LogListener.matches(str ->
+ str.startsWith("Cache groups with earliest reserved checkpoint and a reason why a previous checkpoint was inapplicable:") &&
+ str.contains("cache_group1") && str.contains("cache_group2")).times(3).
+ andMatches(str -> str.startsWith("Starting rebalance routine") &&
+ (str.contains("cache_group1") || str.contains("cache_group2")) &&
+ str.contains("fullPartitions=[], histPartitions=[0-7]")).times(2).build();
+
+ LogListener unexpectedMessagesLsnr = LogListener.matches((str) ->
+ str.startsWith("Partitions weren't present in any history reservation:") ||
+ str.startsWith("Partitions were reserved, but maximum available counter is greater than demanded:")
+ ).build();
+
+ checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(expMsgsLsnr, unexpectedMessagesLsnr);
+
+ assertTrue(expMsgsLsnr.check());
+ assertFalse(unexpectedMessagesLsnr.check());
+ }
+
+ /**
+ * Check that in case of Full rebalance we log appropriate messages.
+ * <p>
+ * <b>Steps:</b>
+ * <ol>
+ * <li>restore IGNITE_PDS_WAL_REBALANCE_THRESHOLD to default 500000</li>
+ * <li>Start two nodes.</li>
+ * <li>Create two caches each in it's own cache group and populate them with some data.</li>
+ * <li>Stop second node and add more data to both caches.</li>
+ * <li>Wait checkpoint frequency * 2. This is required to guarantee that at least one checkpoint would be
+ * created.</li>
+ * <li>Start, previously stopped node and await for PME.</li>
+ * </ol>
+ * <p>
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "500000")
+ public void testFullRebalanceLogMsgs() throws Exception {
+ LogListener expMsgsLsnr = LogListener.
+ matches("Partitions weren't present in any history reservation: " +
+ "[[grp=cache_group2 part=[[0-7]]], [grp=cache_group1 part=[[0-7]]]]").
+ andMatches(str -> str.startsWith("Starting rebalance routine") &&
+ (str.contains("cache_group1") || str.contains("cache_group2")) &&
+ str.contains("fullPartitions=[0-7], histPartitions=[]")).times(2).build();
+
+ checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(expMsgsLsnr);
+
+ assertTrue(expMsgsLsnr.check());
+ }
+
+ /**
+ * Test checks log messages in case of short history of checkpoint.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, value = "2")
+ @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "1")
+ public void testFullRebalanceWithShortCpHistoryLogMsgs() throws Exception {
+ LogListener expMsgsLsnr = LogListener.
+ matches(str -> str.startsWith("Partitions were reserved, but maximum available counter is greater than demanded: ") &&
+ str.contains("grp=cache_group1") && str.contains("grp=cache_group2")).
+ andMatches(str -> str.startsWith("Starting rebalance routine") &&
+ (str.contains("cache_group1") || str.contains("cache_group2")) &&
+ str.contains("fullPartitions=[0-7], histPartitions=[]")).times(2).build();
+
+ checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(expMsgsLsnr);
+
+ assertTrue(expMsgsLsnr.check());
+ }
+
+ /**
+ * Test utility method.
+ *
+ * @param lsnrs Listeners to register with server logger.
+ * @throws Exception If failed.
+ */
+ private void checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(LogListener... lsnrs)
+ throws Exception {
+ startGridsMultiThreaded(2).cluster().active(true);
+
+ IgniteCache<Integer, String> cache1 = createCache("cache1", "cache_group1");
+ IgniteCache<Integer, String> cache2 = createCache("cache2", "cache_group2");
+
+ for (int i = 0; i < KEYS_LOW_BORDER; i++) {
+ cache1.put(i, "abc" + i);
+ cache2.put(i, "abc" + i);
+
+ if (i % 20 == 0)
+ forceCheckpointAndRollOwerWal();
+ }
+
+ stopGrid(1);
+
+ for (int i = KEYS_LOW_BORDER; i < KEYS_UPPER_BORDER; i++) {
+ cache1.put(i, "abc" + i);
+ cache2.put(i, "abc" + i);
+
+ if (i % 20 == 0)
+ forceCheckpointAndRollOwerWal();
+ }
+
+ srvLog.clearListeners();
+
+ for (LogListener lsnr: lsnrs)
+ srvLog.registerListener(lsnr);
+
+ startGrid(1);
+
+ awaitPartitionMapExchange(false, true, null);
+ }
+
+ /**
+ * Create cache with specific name and group name.
+ * @param cacheName Cache name.
+ * @param cacheGrpName Cache group name.
+ * @return Created cache.
+ */
+ private IgniteCache<Integer, String> createCache(String cacheName, String cacheGrpName) {
+ return ignite(0).createCache(
+ new CacheConfiguration<Integer, String>(cacheName).
+ setAffinity(new RendezvousAffinityFunction().setPartitions(8))
+ .setGroupName(cacheGrpName).
+ setBackups(1));
+ }
+
+ /**
+ * Invokes checkpoint forcibly and rollovers WAL segment.
+ * It might be need for simulate long checkpoint history in test.
+ *
+ * @throws Exception If failed.
+ */
+ private void forceCheckpointAndRollOwerWal() throws Exception {
+ forceCheckpoint();
+
+ for (Ignite ignite : G.allGrids()) {
+ if (ignite.cluster().localNode().isClient())
+ continue;
+
+ IgniteEx ig = (IgniteEx)ignite;
+
+ IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
+
+ ig.context().cache().context().database().checkpointReadLock();
+
+ try {
+ WALPointer ptr = walMgr.log(new AdHocWALRecord(), CURRENT_SEGMENT);
+ }
+ finally {
+ ig.context().cache().context().database().checkpointReadUnlock();
+ }
+ }
+ }
+
+ /** Tets WAL record. */
+ private static class AdHocWALRecord extends CheckpointRecord {
+ /** Default constructor. */
+ private AdHocWALRecord() {
+ super(null);
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
index 38f93c6..a17e889 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWALT
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFormatFileFailoverTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorExceptionDuringReadTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceLoggingTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest;
@@ -93,6 +94,7 @@ public class IgnitePdsMvccTestSuite2 {
ignoredTests.add(StandaloneWalRecordsIteratorTest.class);
ignoredTests.add(IgniteWALTailIsReachedDuringIterationOverArchiveTest.class);
ignoredTests.add(WalRolloverTypesTest.class);
+ ignoredTests.add(IgniteWalRebalanceLoggingTest.class);
ignoredTests.add(FsyncWalRolloverDoesNotBlockTest.class);
return IgnitePdsTestSuite2.suite(ignoredTests);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index b5a314f..2f065c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalF
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorExceptionDuringReadTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceLoggingTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalReplayingAfterRestartTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest;
@@ -259,5 +260,7 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlDeactivateOnHighloadTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, WalPreloadingConcurrentTest.class, ignoredTests);
+
+ GridTestUtils.addTestIfNeeded(suite, IgniteWalRebalanceLoggingTest.class, ignoredTests);
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java
index 2e63fdb..57021aa 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java
@@ -46,6 +46,8 @@ public class IgniteDataStreamerTest extends GridCommonAbstractTest {
startGrids(2);
startClientGrid("client");
+
+ awaitPartitionMapExchange();
}
@Override protected void afterTest() throws Exception {