You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/11/25 14:52:30 UTC

[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1366: IGNITE-18172 Add learners' assignments to the rebalance algorithm

ibessonov commented on code in PR #1366:
URL: https://github.com/apache/ignite-3/pull/1366#discussion_r1032382467


##########
modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java:
##########
@@ -34,7 +35,7 @@
 public class AffinityServiceTest {
     @Test
     public void testCalculatedAssignmentHappyPath() {
-        List<List<ClusterNode>> assignments = AffinityUtils.calculateAssignments(
+        List<Set<Assignment>> assignments = AffinityUtils.calculateAssignments(

Review Comment:
   Maybe I'm asking too much and it's inappropriate at this time, but will it make sense to introduce a class like "Assignments" that holds the list of sets?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1759,95 +1741,95 @@ public boolean onUpdate(@NotNull WatchEvent evt) {
 
                     TablePartitionId replicaGrpId = new TablePartitionId(tblId, partId);
 
-                    // Assignments of the pending rebalance that we received through the meta storage watch mechanism.
-                    Set<ClusterNode> newPeers = ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value());
-
-                    var pendingAssignments = metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)).join();
+                    Entry pendingAssignmentsEntry = metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)).join();
 
-                    assert pendingAssignmentsWatchEvent.revision() <= pendingAssignments.revision()
+                    assert pendingAssignmentsWatchEvent.revision() <= pendingAssignmentsEntry.revision()
                             : "Meta Storage watch cannot notify about an event with the revision that is more than the actual revision.";
 
+                    // Assignments of the pending rebalance that we received through the meta storage watch mechanism.
+                    Set<Assignment> pendingAssignments = ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value());
+
                     TableImpl tbl = tablesByIdVv.latest().get(tblId);
 
                     ExtendedTableConfiguration tblCfg = (ExtendedTableConfiguration) tablesCfg.tables().get(tbl.name());
 
                     // Stable assignments from the meta store, which revision is bounded by the current pending event.
-                    byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId),
+                    byte[] stableAssignmentsBytes = metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId),
                             pendingAssignmentsWatchEvent.revision()).join().value();
 
-                    Set<ClusterNode> assignments = stableAssignments == null
+                    Set<Assignment> stableAssignments = stableAssignmentsBytes == null
                             // This is for the case when the first rebalance occurs.
-                            ? ((List<Set<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
-                            : ByteUtils.fromBytes(stableAssignments);
+                            ? ((List<Set<Assignment>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
+                            : ByteUtils.fromBytes(stableAssignmentsBytes);
+
+                    List<String> stablePeers = new ArrayList<>();
+                    List<String> stableLearners = new ArrayList<>();
+
+                    for (Assignment assignment : stableAssignments) {
+                        if (assignment.isPeer()) {
+                            stablePeers.add(assignment.consistentId());
+                        } else {
+                            stableLearners.add(assignment.consistentId());
+                        }
+                    }
 
-                    placementDriver.updateAssignment(replicaGrpId, assignments);
+                    placementDriver.updateAssignment(replicaGrpId, stablePeers);
 
                     ClusterNode localMember = raftMgr.topologyService().localMember();
 
-                    List<ClusterNode> deltaPeers = newPeers.stream()
-                            .filter(p -> !assignments.contains(p))
-                            .collect(toList());
+                    // Start a new Raft node and Replica if this node has appeared in the new assignments.
+                    boolean shouldStartLocalServices = pendingAssignments.stream()
+                            .filter(assignment -> !stableAssignments.contains(assignment))
+                            .anyMatch(assignment -> localMember.name().equals(assignment.consistentId()));

Review Comment:
   Not your code, I know, but swapping these 2 predicates would make much more sense, to me at least



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java:
##########
@@ -366,43 +360,36 @@ public void before() throws Exception {
      */
     protected Int2ObjectOpenHashMap<RaftGroupService> startTable(String name, UUID tblId)
             throws Exception {
-        List<List<ClusterNode>> assignment = RendezvousAffinityFunction.assignPartitions(
-                cluster.stream().map(node -> node.topologyService().localMember())
-                        .collect(toList()),
+        List<Set<Assignment>> assignments = AffinityUtils.calculateAssignments(
+                cluster.stream().map(node -> node.topologyService().localMember()).collect(toList()),
                 1,
-                replicas(),
-                false,
-                null
+                replicas()
         );
 
-        Map<ClusterNode, Function<Peer, Boolean>> isLocalPeerCheckerList = cluster.stream()
-                .map(ClusterService::topologyService)
-                .collect(toMap(
-                        TopologyService::localMember,
-                        ts -> peer -> ts.getByConsistentId(peer.consistentId()).equals(ts.localMember())
-                ));
-
         Int2ObjectOpenHashMap<RaftGroupService> clients = new Int2ObjectOpenHashMap<>();
 
         List<CompletableFuture<Void>> partitionReadyFutures = new ArrayList<>();
 
-        for (int p = 0; p < assignment.size(); p++) {
-            List<ClusterNode> partNodes = assignment.get(p);
+        for (int p = 0; p < assignments.size(); p++) {
+            Set<Assignment> partAssignments = assignments.get(p);
 
             TablePartitionId grpId = new TablePartitionId(tblId, p);
 
-            List<Peer> conf = partNodes.stream().map(n -> new Peer(n.name()))
-                    .collect(toList());
+            List<Peer> conf = partAssignments.stream().map(a -> new Peer(a.consistentId())).collect(toList());
+
+            for (Assignment assignment : partAssignments) {
+                String nodeName = assignment.consistentId();
 
-            for (ClusterNode node : partNodes) {
                 var testMpPartStorage = new TestMvPartitionStorage(0);
                 var txStateStorage = new TestTxStateStorage();
-                var placementDriver = new PlacementDriver(replicaServices.get(node));
+                var placementDriver = new PlacementDriver(replicaServices.get(nodeName), consistentIdToNode);
 
-                for (int part = 0; part < assignment.size(); part++) {
+                for (int part = 0; part < assignments.size(); part++) {
                     ReplicationGroupId replicaGrpId = new TablePartitionId(tblId, part);
 
-                    placementDriver.updateAssignment(replicaGrpId, assignment.get(part));
+                    List<String> replicaAssignment = assignments.get(part).stream().map(Assignment::consistentId).collect(toList());
+
+                    placementDriver.updateAssignment(replicaGrpId, replicaAssignment);

Review Comment:
   Should placement driver also use the new class? Maybe in the future?



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignment.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.io.Serializable;
+
+/**
+ * Represent an assignment of a partition to a node with a specific {@code consistentId}.
+ *
+ * <p>There can be two types of assignments: one for the voting members of the partition Raft group (a.k.a. "peers") and one for
+ * the learners of the same group.
+ */
+public class Assignment implements Serializable {
+    private static final long serialVersionUID = -8892379245627437834L;
+
+    private final String consistentId;
+
+    private final boolean isPeer;
+
+    private Assignment(String consistentId, boolean isPeer) {
+        this.consistentId = consistentId;
+        this.isPeer = isPeer;
+    }
+
+    /**
+     * Creates a peer assignment.
+     *
+     * @param consistentId Peer consistent ID.
+     */
+    public static Assignment forPeer(String consistentId) {
+        return new Assignment(consistentId, true);
+    }
+
+    /**
+     * Creates a learner assignment.
+     *
+     * @param consistentId Learner consistent ID.
+     */
+    public static Assignment forLearner(String consistentId) {
+        return new Assignment(consistentId, false);
+    }
+
+    public String consistentId() {
+        return consistentId;
+    }
+
+    public boolean isPeer() {
+        return isPeer;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Assignment that = (Assignment) o;
+
+        if (isPeer != that.isPeer) {
+            return false;
+        }
+        return consistentId.equals(that.consistentId);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = consistentId.hashCode();
+        result = 31 * result + (isPeer ? 1 : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "Assignment{"

Review Comment:
   Default toString format from IDEA, we usually use brackets instead of braces



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java:
##########
@@ -366,43 +360,36 @@ public void before() throws Exception {
      */
     protected Int2ObjectOpenHashMap<RaftGroupService> startTable(String name, UUID tblId)
             throws Exception {
-        List<List<ClusterNode>> assignment = RendezvousAffinityFunction.assignPartitions(
-                cluster.stream().map(node -> node.topologyService().localMember())
-                        .collect(toList()),
+        List<Set<Assignment>> assignments = AffinityUtils.calculateAssignments(
+                cluster.stream().map(node -> node.topologyService().localMember()).collect(toList()),
                 1,
-                replicas(),
-                false,
-                null
+                replicas()
         );
 
-        Map<ClusterNode, Function<Peer, Boolean>> isLocalPeerCheckerList = cluster.stream()
-                .map(ClusterService::topologyService)
-                .collect(toMap(
-                        TopologyService::localMember,
-                        ts -> peer -> ts.getByConsistentId(peer.consistentId()).equals(ts.localMember())
-                ));
-
         Int2ObjectOpenHashMap<RaftGroupService> clients = new Int2ObjectOpenHashMap<>();
 
         List<CompletableFuture<Void>> partitionReadyFutures = new ArrayList<>();
 
-        for (int p = 0; p < assignment.size(); p++) {
-            List<ClusterNode> partNodes = assignment.get(p);
+        for (int p = 0; p < assignments.size(); p++) {
+            Set<Assignment> partAssignments = assignments.get(p);
 
             TablePartitionId grpId = new TablePartitionId(tblId, p);
 
-            List<Peer> conf = partNodes.stream().map(n -> new Peer(n.name()))
-                    .collect(toList());
+            List<Peer> conf = partAssignments.stream().map(a -> new Peer(a.consistentId())).collect(toList());
+
+            for (Assignment assignment : partAssignments) {
+                String nodeName = assignment.consistentId();
 
-            for (ClusterNode node : partNodes) {
                 var testMpPartStorage = new TestMvPartitionStorage(0);
                 var txStateStorage = new TestTxStateStorage();
-                var placementDriver = new PlacementDriver(replicaServices.get(node));
+                var placementDriver = new PlacementDriver(replicaServices.get(nodeName), consistentIdToNode);
 
-                for (int part = 0; part < assignment.size(); part++) {
+                for (int part = 0; part < assignments.size(); part++) {

Review Comment:
   I know it's not your code, but do you understand what's going on? I don't think I do.
   There are 3 nested loops, 2 of them iterate over all partition ids. Maybe this test deserves a little refactoring to remove the last loop, if it makes no sense of course



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org