You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/06/27 10:55:11 UTC

[ignite-3] branch main updated: IGNITE-16955 Improve logging for rebalance process. Fixes #881

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 65b4130c6 IGNITE-16955 Improve logging for rebalance process. Fixes #881
65b4130c6 is described below

commit 65b4130c699db6b5b9146c8b8d3b01b3195f64a5
Author: Kirill Gusakov <kg...@gmail.com>
AuthorDate: Mon Jun 27 13:54:53 2022 +0300

    IGNITE-16955 Improve logging for rebalance process. Fixes #881
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../java/org/apache/ignite/internal/raft/Loza.java | 12 ++++
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  2 +
 .../apache/ignite/raft/jraft/core/Replicator.java  |  4 ++
 .../raft/jraft/rpc/impl/RaftGroupServiceImpl.java  |  6 ++
 .../internal/table/distributed/TableManager.java   | 17 ++++-
 .../raft/RebalanceRaftGroupEventsListener.java     | 18 ++++++
 .../ignite/internal/utils/RebalanceUtil.java       | 73 +++++++++++++++++++---
 7 files changed, 120 insertions(+), 12 deletions(-)

diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 96407ad77..d32e4d338 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
@@ -70,6 +71,9 @@ public class Loza implements IgniteComponent {
     /** Retry delay. */
     private static final int DELAY = 200;
 
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(Loza.class);
+
     /** Cluster network service. */
     private final ClusterService clusterNetSvc;
 
@@ -187,6 +191,8 @@ public class Loza implements IgniteComponent {
         boolean hasLocalRaft = nodes.stream().anyMatch(n -> locNodeName.equals(n.name()));
 
         if (hasLocalRaft) {
+            LOG.info("Start new raft node for group={} with initial peers={}", groupId, peers);
+
             if (!raftServer.startRaftGroup(groupId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
                 throw new IgniteInternalException(IgniteStringFormatter.format(
                         "Raft group on the node is already started [node={}, raftGrp={}]",
@@ -237,6 +243,8 @@ public class Loza implements IgniteComponent {
             String locNodeName = clusterNetSvc.topologyService().localMember().name();
 
             if (deltaNodes.stream().anyMatch(n -> locNodeName.equals(n.name()))) {
+                LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
+
                 if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
                     throw new IgniteInternalException(IgniteStringFormatter.format(
                             "Raft group on the node is already started [node={}, raftGrp={}]",
@@ -303,6 +311,8 @@ public class Loza implements IgniteComponent {
         String locNodeName = clusterNetSvc.topologyService().localMember().name();
 
         if (deltaNodes.stream().anyMatch(n -> locNodeName.equals(n.name()))) {
+            LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
+
             if (!raftServer.startRaftGroup(grpId,  raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
                 throw new IgniteInternalException(IgniteStringFormatter.format(
                         "Raft group on the node is already started [node={}, raftGrp={}]",
@@ -337,6 +347,8 @@ public class Loza implements IgniteComponent {
         }
 
         try {
+            LOG.info("Stop raft group={}", groupId);
+
             raftServer.stopRaftGroup(groupId);
         } finally {
             busyLock.leaveBusy();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 962fbfa90..a908d00ee 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -406,6 +406,7 @@ public class NodeImpl implements Node, RaftServerService {
             }
             Requires.requireTrue(this.stage == Stage.STAGE_CATCHING_UP, "Stage is not in STAGE_CATCHING_UP");
             if (success) {
+                LOG.info("Catch up for peer={} was finished", peer);
                 this.addingPeers.remove(peer);
                 if (this.addingPeers.isEmpty()) {
                     nextStage();
@@ -496,6 +497,7 @@ public class NodeImpl implements Node, RaftServerService {
             Requires.requireTrue(isBusy(), "Not in busy stage");
             switch (this.stage) {
                 case STAGE_CATCHING_UP:
+                    LOG.info("Catch up phase to change peers from={} to={} was successfully finished", oldPeers, newPeers);
                     if (this.nchanges > 0) {
                         this.stage = Stage.STAGE_JOINT;
                         this.node.unsafeApplyConfiguration(new Configuration(this.newPeers, this.newLearners),
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index cc75449b3..63d162145 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -1016,6 +1016,10 @@ public class Replicator implements ThreadId.OnError {
         if (code != RaftError.ETIMEDOUT.getNumber()) {
             if (this.nextIndex - 1 + this.catchUpClosure.getMaxMargin() < this.options.getLogManager()
                 .getLastLogIndex()) {
+
+                LOG.debug("Catch up for peer={} in progress, current index={} (leader log last index={}, catch up margin={})",
+                        getOpts().getPeerId(), nextIndex - 1, options.getLogManager().getLastLogIndex(), catchUpClosure.getMaxMargin());
+
                 return;
             }
             if (this.catchUpClosure.isErrorWasSet()) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 8ea0d0f0b..4f8891408 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -372,6 +372,9 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
         CompletableFuture<ChangePeersAsyncResponse> fut = new CompletableFuture<>();
 
+        LOG.info("Sending changePeersAsync request for group={} to peers={} with leader term={}",
+                groupId, peers, term);
+
         sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
 
         return fut.handle((resp, err) -> {
@@ -567,6 +570,9 @@ public class RaftGroupServiceImpl implements RaftGroupService {
                 if (err != null) {
                     if (recoverable(err)) {
                         executor.schedule(() -> {
+                            LOG.warn("Recoverable error during the request type={} occurred (will be retried on the randomly selected node): ",
+                                    err, req.getClass().getSimpleName());
+
                             sendWithRetry(randomNode(), req, stopTime, fut);
 
                             return null;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 80c234cbd..ff07d7806 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -367,6 +367,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             if (replicasCtx.oldValue() != null && replicasCtx.oldValue() > 0) {
                 TableConfiguration tblCfg = replicasCtx.config(TableConfiguration.class);
 
+                LOG.info("Received update for replicas number for table={} from replicas={} to replicas={}",
+                        tblCfg.name().value(), replicasCtx.oldValue(), replicasCtx.newValue());
+
                 int partCnt = tblCfg.partitions().value();
 
                 int newReplicas = replicasCtx.newValue();
@@ -377,7 +380,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                     String partId = partitionRaftGroupName(((ExtendedTableConfiguration) tblCfg).id().value(), i);
 
                     futures[i] = updatePendingAssignmentsKeys(
-                            partId, baselineMgr.nodes(),
+                            tblCfg.name().value(), partId, baselineMgr.nodes(),
                             partCnt, newReplicas,
                             replicasCtx.storageRevision(), metaStorageMgr, i);
                 }
@@ -1277,11 +1280,17 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                             ? ((List<List<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(part)
                             : (List<ClusterNode>) ByteUtils.fromBytes(stableAssignments);
 
+                    ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember();
+
                     var deltaPeers = newPeers.stream()
                             .filter(p -> !assignments.contains(p))
                             .collect(Collectors.toList());
 
                     try {
+                        LOG.info("Received update on pending key={} for partition={}, table={}, "
+                                        + "check if current node={} should start new raft group node for partition rebalance.",
+                                pendingAssignmentsWatchEvent.key(), part, tbl.name(), localMember.address());
+
                         raftMgr.startRaftGroupNode(partId, assignments, deltaPeers, raftGrpLsnrSupplier,
                                 raftGrpEvtsLsnrSupplier);
                     } catch (NodeStoppingException e) {
@@ -1300,10 +1309,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                     IgniteBiTuple<Peer, Long> leaderWithTerm = partGrpSvc.refreshAndGetLeaderWithTerm().join();
 
-                    ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember();
-
                     // run update of raft configuration if this node is a leader
                     if (localMember.address().equals(leaderWithTerm.get1().address())) {
+                        LOG.info("Current node={} is the leader of partition raft group={}. "
+                                        + "Initiate rebalance process for partition={}, table={}",
+                                localMember.address(), partId, part, tbl.name());
+
                         partGrpSvc.changePeersAsync(newNodes, leaderWithTerm.get2()).join();
                     }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index d8df8b066..743b65844 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -140,6 +140,10 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
                     if (!pendingEntry.empty()) {
                         List<ClusterNode> pendingNodes = (List<ClusterNode>) ByteUtils.fromBytes(pendingEntry.value());
 
+                        LOG.info("New leader was elected for the raft group={} "
+                                        + "of partition={}, table={} and pending reconfiguration to peers={} was discovered",
+                                partId, partNum, tblConfiguration.name().value(), pendingNodes);
+
                         movePartitionFn.apply(clusterNodesToPeers(pendingNodes), term).join();
                     }
                 } catch (InterruptedException | ExecutionException e) {
@@ -269,16 +273,30 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
                                 remove(plannedPartAssignmentsKey(partId)))
                                 .yield(true),
                         ops().yield(false))).get().getAsBoolean()) {
+                    LOG.info("Planned key={} was changed, while trying to update rebalance information about partition={}, table={} "
+                            + "to peers={}, another attempt will be made",
+                            plannedPartAssignmentsKey(partId), partNum, tblConfiguration.name(), appliedPeers);
+
                     doOnNewPeersConfigurationApplied(peers);
                 }
+
+                LOG.info("Finished rebalance of partition={}, table={} to peers={} and queued new rebalance to peers={}",
+                        partNum, tblConfiguration.name().value(), appliedPeers, ByteUtils.fromBytes(plannedEntry.value()));
             } else {
                 if (!metaStorageMgr.invoke(If.iif(
                         notExists(plannedPartAssignmentsKey(partId)),
                         ops(put(stablePartAssignmentsKey(partId), ByteUtils.toBytes(appliedPeers)),
                                 remove(pendingPartAssignmentsKey(partId))).yield(true),
                         ops().yield(false))).get().getAsBoolean()) {
+                    LOG.info("Planned key={} was changed, while trying to update rebalance information about partition={}, table={} "
+                                    + "to peers={}, another attempt will be made",
+                            plannedPartAssignmentsKey(partId), partNum, tblConfiguration.name(), appliedPeers);
+
                     doOnNewPeersConfigurationApplied(peers);
                 }
+
+                LOG.info("Finished rebalance of partition={}, table={} to peers={} and no new rebalance in planned key={} discovered",
+                        partNum, tblConfiguration.name().value(), appliedPeers, plannedPartAssignmentsKey(partId));
             }
 
             rebalanceAttempts.set(0);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index 47e253746..8d20a89e4 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -31,9 +31,9 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.client.If;
-import org.apache.ignite.internal.metastorage.client.StatementResult;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.NotNull;
 
@@ -42,9 +42,33 @@ import org.jetbrains.annotations.NotNull;
  */
 public class RebalanceUtil {
 
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(RebalanceUtil.class);
+
+    /** Return code of metastore multi-invoke which identifies,
+     * that pending key was updated to new value (i.e. there is no active rebalance at the moment of call).
+     */
+    private static final int PENDING_KEY_UPDATED = 0;
+
+    /** Return code of metastore multi-invoke which identifies,
+     * that planned key was updated to new value (i.e. there is an active rebalance at the moment of call).
+     */
+    private static final int PLANNED_KEY_UPDATED = 1;
+
+    /** Return code of metastore multi-invoke which identifies,
+     * that planned key was removed, because current rebalance is already have the same target.
+     */
+    private static final int PLANNED_KEY_REMOVED = 2;
+
+    /** Return code of metastore multi-invoke which identifies,
+     * that this trigger event was already processed by another node and must be skipped.
+     */
+    private static final int OUTDATED_UPDATE_RECEIVED = 3;
+
     /**
      * Update keys that related to rebalance algorithm in Meta Storage. Keys are specific for partition.
      *
+     * @param tableName Table name.
      * @param partId Unique identifier of a partition.
      * @param baselineNodes Nodes in baseline.
      * @param partitions Number of partitions in a table.
@@ -53,8 +77,8 @@ public class RebalanceUtil {
      * @param metaStorageMgr Meta Storage manager.
      * @return Future representing result of updating keys in {@code metaStorageMgr}
      */
-    public static @NotNull CompletableFuture<StatementResult> updatePendingAssignmentsKeys(
-            String partId, Collection<ClusterNode> baselineNodes,
+    public static @NotNull CompletableFuture<Void> updatePendingAssignmentsKeys(
+            String tableName, String partId, Collection<ClusterNode> baselineNodes,
             int partitions, int replicas, long revision, MetaStorageManager metaStorageMgr, int partNum) {
         ByteArray partChangeTriggerKey = partChangeTriggerKey(partId);
 
@@ -84,16 +108,47 @@ public class RebalanceUtil {
                         ops(
                                 put(partAssignmentsPendingKey, partAssignmentsBytes),
                                 put(partChangeTriggerKey, ByteUtils.longToBytes(revision))
-                        ).yield(),
+                        ).yield(PENDING_KEY_UPDATED),
                         If.iif(value(partAssignmentsPendingKey).ne(partAssignmentsBytes),
                                 ops(
                                         put(partAssignmentsPlannedKey, partAssignmentsBytes),
                                         put(partChangeTriggerKey, ByteUtils.longToBytes(revision))
-                                ).yield(),
-                                ops(remove(partAssignmentsPlannedKey)).yield())),
-                ops().yield());
-
-        return metaStorageMgr.invoke(iif);
+                                ).yield(PLANNED_KEY_UPDATED),
+                                ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED))),
+                ops().yield(OUTDATED_UPDATE_RECEIVED));
+
+        return metaStorageMgr.invoke(iif).thenAccept(sr -> {
+            switch (sr.getAsInt()) {
+                case PENDING_KEY_UPDATED:
+                    LOG.info(
+                            "Update metastore pending partitions key={} for partition={}, table={} to {}",
+                            partAssignmentsPendingKey.toString(), partNum, tableName,
+                            ByteUtils.fromBytes(partAssignmentsBytes));
+
+                    break;
+                case PLANNED_KEY_UPDATED:
+                    LOG.info(
+                            "Update metastore planned partitions key={} for partition={}, table={} to {}",
+                            partAssignmentsPlannedKey, partNum, tableName, ByteUtils.fromBytes(partAssignmentsBytes));
+
+                    break;
+                case PLANNED_KEY_REMOVED:
+                    LOG.info(
+                            "Remove planned key={} for partition={}, table={} due to the fact, "
+                                    + "that current pending key has the same value as planned={}",
+                            partAssignmentsPlannedKey.toString(), partNum, tableName, ByteUtils.fromBytes(partAssignmentsBytes));
+
+                    break;
+                case OUTDATED_UPDATE_RECEIVED:
+                    LOG.debug(
+                            "Received outdated rebalance trigger event with revision={} for partition={}, table={}",
+                            revision, partNum, tableName);
+
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown return code for rebalance metastore multi-invoke");
+            }
+        });
     }
 
     /** Key prefix for pending assignments. */