You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/06/27 10:55:11 UTC
[ignite-3] branch main updated: IGNITE-16955 Improve logging for rebalance process. Fixes #881
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 65b4130c6 IGNITE-16955 Improve logging for rebalance process. Fixes #881
65b4130c6 is described below
commit 65b4130c699db6b5b9146c8b8d3b01b3195f64a5
Author: Kirill Gusakov <kg...@gmail.com>
AuthorDate: Mon Jun 27 13:54:53 2022 +0300
IGNITE-16955 Improve logging for rebalance process. Fixes #881
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../java/org/apache/ignite/internal/raft/Loza.java | 12 ++++
.../apache/ignite/raft/jraft/core/NodeImpl.java | 2 +
.../apache/ignite/raft/jraft/core/Replicator.java | 4 ++
.../raft/jraft/rpc/impl/RaftGroupServiceImpl.java | 6 ++
.../internal/table/distributed/TableManager.java | 17 ++++-
.../raft/RebalanceRaftGroupEventsListener.java | 18 ++++++
.../ignite/internal/utils/RebalanceUtil.java | 73 +++++++++++++++++++---
7 files changed, 120 insertions(+), 12 deletions(-)
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 96407ad77..d32e4d338 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
@@ -70,6 +71,9 @@ public class Loza implements IgniteComponent {
/** Retry delay. */
private static final int DELAY = 200;
+ /** Logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(Loza.class);
+
/** Cluster network service. */
private final ClusterService clusterNetSvc;
@@ -187,6 +191,8 @@ public class Loza implements IgniteComponent {
boolean hasLocalRaft = nodes.stream().anyMatch(n -> locNodeName.equals(n.name()));
if (hasLocalRaft) {
+ LOG.info("Start new raft node for group={} with initial peers={}", groupId, peers);
+
if (!raftServer.startRaftGroup(groupId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
throw new IgniteInternalException(IgniteStringFormatter.format(
"Raft group on the node is already started [node={}, raftGrp={}]",
@@ -237,6 +243,8 @@ public class Loza implements IgniteComponent {
String locNodeName = clusterNetSvc.topologyService().localMember().name();
if (deltaNodes.stream().anyMatch(n -> locNodeName.equals(n.name()))) {
+ LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
+
if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
throw new IgniteInternalException(IgniteStringFormatter.format(
"Raft group on the node is already started [node={}, raftGrp={}]",
@@ -303,6 +311,8 @@ public class Loza implements IgniteComponent {
String locNodeName = clusterNetSvc.topologyService().localMember().name();
if (deltaNodes.stream().anyMatch(n -> locNodeName.equals(n.name()))) {
+ LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
+
if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
throw new IgniteInternalException(IgniteStringFormatter.format(
"Raft group on the node is already started [node={}, raftGrp={}]",
@@ -337,6 +347,8 @@ public class Loza implements IgniteComponent {
}
try {
+ LOG.info("Stop raft group={}", groupId);
+
raftServer.stopRaftGroup(groupId);
} finally {
busyLock.leaveBusy();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 962fbfa90..a908d00ee 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -406,6 +406,7 @@ public class NodeImpl implements Node, RaftServerService {
}
Requires.requireTrue(this.stage == Stage.STAGE_CATCHING_UP, "Stage is not in STAGE_CATCHING_UP");
if (success) {
+ LOG.info("Catch up for peer={} was finished", peer);
this.addingPeers.remove(peer);
if (this.addingPeers.isEmpty()) {
nextStage();
@@ -496,6 +497,7 @@ public class NodeImpl implements Node, RaftServerService {
Requires.requireTrue(isBusy(), "Not in busy stage");
switch (this.stage) {
case STAGE_CATCHING_UP:
+ LOG.info("Catch up phase to change peers from={} to={} was successfully finished", oldPeers, newPeers);
if (this.nchanges > 0) {
this.stage = Stage.STAGE_JOINT;
this.node.unsafeApplyConfiguration(new Configuration(this.newPeers, this.newLearners),
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index cc75449b3..63d162145 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -1016,6 +1016,10 @@ public class Replicator implements ThreadId.OnError {
if (code != RaftError.ETIMEDOUT.getNumber()) {
if (this.nextIndex - 1 + this.catchUpClosure.getMaxMargin() < this.options.getLogManager()
.getLastLogIndex()) {
+
+ LOG.debug("Catch up for peer={} in progress, current index={} (leader log last index={}, catch up margin={})",
+ getOpts().getPeerId(), nextIndex - 1, options.getLogManager().getLastLogIndex(), catchUpClosure.getMaxMargin());
+
return;
}
if (this.catchUpClosure.isErrorWasSet()) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 8ea0d0f0b..4f8891408 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -372,6 +372,9 @@ public class RaftGroupServiceImpl implements RaftGroupService {
CompletableFuture<ChangePeersAsyncResponse> fut = new CompletableFuture<>();
+ LOG.info("Sending changePeersAsync request for group={} to peers={} with leader term={}",
+ groupId, peers, term);
+
sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
return fut.handle((resp, err) -> {
@@ -567,6 +570,9 @@ public class RaftGroupServiceImpl implements RaftGroupService {
if (err != null) {
if (recoverable(err)) {
executor.schedule(() -> {
+ LOG.warn("Recoverable error during the request type={} occurred (will be retried on the randomly selected node): ",
+ err, req.getClass().getSimpleName());
+
sendWithRetry(randomNode(), req, stopTime, fut);
return null;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 80c234cbd..ff07d7806 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -367,6 +367,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
if (replicasCtx.oldValue() != null && replicasCtx.oldValue() > 0) {
TableConfiguration tblCfg = replicasCtx.config(TableConfiguration.class);
+ LOG.info("Received update for replicas number for table={} from replicas={} to replicas={}",
+ tblCfg.name().value(), replicasCtx.oldValue(), replicasCtx.newValue());
+
int partCnt = tblCfg.partitions().value();
int newReplicas = replicasCtx.newValue();
@@ -377,7 +380,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
String partId = partitionRaftGroupName(((ExtendedTableConfiguration) tblCfg).id().value(), i);
futures[i] = updatePendingAssignmentsKeys(
- partId, baselineMgr.nodes(),
+ tblCfg.name().value(), partId, baselineMgr.nodes(),
partCnt, newReplicas,
replicasCtx.storageRevision(), metaStorageMgr, i);
}
@@ -1277,11 +1280,17 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
? ((List<List<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(part)
: (List<ClusterNode>) ByteUtils.fromBytes(stableAssignments);
+ ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember();
+
var deltaPeers = newPeers.stream()
.filter(p -> !assignments.contains(p))
.collect(Collectors.toList());
try {
+ LOG.info("Received update on pending key={} for partition={}, table={}, "
+ + "check if current node={} should start new raft group node for partition rebalance.",
+ pendingAssignmentsWatchEvent.key(), part, tbl.name(), localMember.address());
+
raftMgr.startRaftGroupNode(partId, assignments, deltaPeers, raftGrpLsnrSupplier,
raftGrpEvtsLsnrSupplier);
} catch (NodeStoppingException e) {
@@ -1300,10 +1309,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
IgniteBiTuple<Peer, Long> leaderWithTerm = partGrpSvc.refreshAndGetLeaderWithTerm().join();
- ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember();
-
// run update of raft configuration if this node is a leader
if (localMember.address().equals(leaderWithTerm.get1().address())) {
+ LOG.info("Current node={} is the leader of partition raft group={}. "
+ + "Initiate rebalance process for partition={}, table={}",
+ localMember.address(), partId, part, tbl.name());
+
partGrpSvc.changePeersAsync(newNodes, leaderWithTerm.get2()).join();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index d8df8b066..743b65844 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -140,6 +140,10 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
if (!pendingEntry.empty()) {
List<ClusterNode> pendingNodes = (List<ClusterNode>) ByteUtils.fromBytes(pendingEntry.value());
+ LOG.info("New leader was elected for the raft group={} "
+ + "of partition={}, table={} and pending reconfiguration to peers={} was discovered",
+ partId, partNum, tblConfiguration.name().value(), pendingNodes);
+
movePartitionFn.apply(clusterNodesToPeers(pendingNodes), term).join();
}
} catch (InterruptedException | ExecutionException e) {
@@ -269,16 +273,30 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
remove(plannedPartAssignmentsKey(partId)))
.yield(true),
ops().yield(false))).get().getAsBoolean()) {
+ LOG.info("Planned key={} was changed, while trying to update rebalance information about partition={}, table={} "
+ + "to peers={}, another attempt will be made",
+ plannedPartAssignmentsKey(partId), partNum, tblConfiguration.name(), appliedPeers);
+
doOnNewPeersConfigurationApplied(peers);
}
+
+ LOG.info("Finished rebalance of partition={}, table={} to peers={} and queued new rebalance to peers={}",
+ partNum, tblConfiguration.name().value(), appliedPeers, ByteUtils.fromBytes(plannedEntry.value()));
} else {
if (!metaStorageMgr.invoke(If.iif(
notExists(plannedPartAssignmentsKey(partId)),
ops(put(stablePartAssignmentsKey(partId), ByteUtils.toBytes(appliedPeers)),
remove(pendingPartAssignmentsKey(partId))).yield(true),
ops().yield(false))).get().getAsBoolean()) {
+ LOG.info("Planned key={} was changed, while trying to update rebalance information about partition={}, table={} "
+ + "to peers={}, another attempt will be made",
+ plannedPartAssignmentsKey(partId), partNum, tblConfiguration.name(), appliedPeers);
+
doOnNewPeersConfigurationApplied(peers);
}
+
+ LOG.info("Finished rebalance of partition={}, table={} to peers={} and no new rebalance in planned key={} discovered",
+ partNum, tblConfiguration.name().value(), appliedPeers, plannedPartAssignmentsKey(partId));
}
rebalanceAttempts.set(0);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index 47e253746..8d20a89e4 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -31,9 +31,9 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.client.If;
-import org.apache.ignite.internal.metastorage.client.StatementResult;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.NotNull;
@@ -42,9 +42,33 @@ import org.jetbrains.annotations.NotNull;
*/
public class RebalanceUtil {
+ /** Logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(RebalanceUtil.class);
+
+ /** Return code of metastore multi-invoke which identifies,
+ * that pending key was updated to new value (i.e. there is no active rebalance at the moment of call).
+ */
+ private static final int PENDING_KEY_UPDATED = 0;
+
+ /** Return code of metastore multi-invoke which identifies,
+ * that planned key was updated to new value (i.e. there is an active rebalance at the moment of call).
+ */
+ private static final int PLANNED_KEY_UPDATED = 1;
+
+ /** Return code of metastore multi-invoke which identifies,
+ * that planned key was removed, because current rebalance is already have the same target.
+ */
+ private static final int PLANNED_KEY_REMOVED = 2;
+
+ /** Return code of metastore multi-invoke which identifies,
+ * that this trigger event was already processed by another node and must be skipped.
+ */
+ private static final int OUTDATED_UPDATE_RECEIVED = 3;
+
/**
* Update keys that related to rebalance algorithm in Meta Storage. Keys are specific for partition.
*
+ * @param tableName Table name.
* @param partId Unique identifier of a partition.
* @param baselineNodes Nodes in baseline.
* @param partitions Number of partitions in a table.
@@ -53,8 +77,8 @@ public class RebalanceUtil {
* @param metaStorageMgr Meta Storage manager.
* @return Future representing result of updating keys in {@code metaStorageMgr}
*/
- public static @NotNull CompletableFuture<StatementResult> updatePendingAssignmentsKeys(
- String partId, Collection<ClusterNode> baselineNodes,
+ public static @NotNull CompletableFuture<Void> updatePendingAssignmentsKeys(
+ String tableName, String partId, Collection<ClusterNode> baselineNodes,
int partitions, int replicas, long revision, MetaStorageManager metaStorageMgr, int partNum) {
ByteArray partChangeTriggerKey = partChangeTriggerKey(partId);
@@ -84,16 +108,47 @@ public class RebalanceUtil {
ops(
put(partAssignmentsPendingKey, partAssignmentsBytes),
put(partChangeTriggerKey, ByteUtils.longToBytes(revision))
- ).yield(),
+ ).yield(PENDING_KEY_UPDATED),
If.iif(value(partAssignmentsPendingKey).ne(partAssignmentsBytes),
ops(
put(partAssignmentsPlannedKey, partAssignmentsBytes),
put(partChangeTriggerKey, ByteUtils.longToBytes(revision))
- ).yield(),
- ops(remove(partAssignmentsPlannedKey)).yield())),
- ops().yield());
-
- return metaStorageMgr.invoke(iif);
+ ).yield(PLANNED_KEY_UPDATED),
+ ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED))),
+ ops().yield(OUTDATED_UPDATE_RECEIVED));
+
+ return metaStorageMgr.invoke(iif).thenAccept(sr -> {
+ switch (sr.getAsInt()) {
+ case PENDING_KEY_UPDATED:
+ LOG.info(
+ "Update metastore pending partitions key={} for partition={}, table={} to {}",
+ partAssignmentsPendingKey.toString(), partNum, tableName,
+ ByteUtils.fromBytes(partAssignmentsBytes));
+
+ break;
+ case PLANNED_KEY_UPDATED:
+ LOG.info(
+ "Update metastore planned partitions key={} for partition={}, table={} to {}",
+ partAssignmentsPlannedKey, partNum, tableName, ByteUtils.fromBytes(partAssignmentsBytes));
+
+ break;
+ case PLANNED_KEY_REMOVED:
+ LOG.info(
+ "Remove planned key={} for partition={}, table={} due to the fact, "
+ + "that current pending key has the same value as planned={}",
+ partAssignmentsPlannedKey.toString(), partNum, tableName, ByteUtils.fromBytes(partAssignmentsBytes));
+
+ break;
+ case OUTDATED_UPDATE_RECEIVED:
+ LOG.debug(
+ "Received outdated rebalance trigger event with revision={} for partition={}, table={}",
+ revision, partNum, tableName);
+
+ break;
+ default:
+ throw new IllegalStateException("Unknown return code for rebalance metastore multi-invoke");
+ }
+ });
}
/** Key prefix for pending assignments. */