You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/06/25 12:14:14 UTC

[GitHub] [ozone] lokeshj1703 commented on a change in pull request #2349: HDDS-4928. Support container move in Replication Manager

lokeshj1703 commented on a change in pull request #2349:
URL: https://github.com/apache/ozone/pull/2349#discussion_r658648587



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -134,6 +139,44 @@
    */
   private final Map<ContainerID, List<InflightAction>> inflightDeletion;
 
+  /**
+   * This is used for tracking container move commands
+   * which are not yet complete.
+   */
+  private final Map<ContainerID,
+      Pair<DatanodeDetails, DatanodeDetails>> inflightMove;
+
+  /**
+   * This is used for indicating the result of move option and
+   * the corresponding reason. this is useful for tracking
+   * the result of move option
+   */
+  enum MoveResult {
+    // both replication and deletion are completed
+    COMPLETED,
+    // replication fail because of timeout
+    REPLICATION_FAIL_TIME_OUT,
+    // replication fail because node is unhealthy
+    REPLICATION_FAIL_NODE_UNHEALTHY,
+    // replication succeed, but deletion fail because of timeout
+    DELETION_FAIL_TIME_OUT,
+    // replication succeed, but deletion fail because because
+    // node is unhealthy
+    DELETION_FAIL_NODE_UNHEALTHY,
+    // replication succeed, but if we delete the container from
+    // the source datanode , the policy(eg, replica num or
+    // rack location) will not be satisfied, so we should not delete
+    // the container
+    DELETE_FAIL_POLICY
+  }

Review comment:
       It would also be useful to have metrics for these move results.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -471,6 +549,127 @@ private void updateInflightAction(final ContainerInfo container,
     }
   }
 
+  /**
+   * add a move action for a given container.
+   *
+   * @param cid Container to move
+   * @param srcDn datanode to move from
+   * @param targetDn datanode to move to
+   */
+  public Optional<CompletableFuture<MoveResult>> move(ContainerID cid,
+      DatanodeDetails srcDn, DatanodeDetails targetDn)
+      throws ContainerNotFoundException, NodeNotFoundException {
+    LOG.info("receive a move requset about container {} , from {} to {}",
+        cid, srcDn.getUuid(), targetDn.getUuid());
+    Optional<CompletableFuture<MoveResult>> ret = Optional.empty();
+    if (!isRunning()) {
+      LOG.info("Replication Manager in not running. please start it first");
+      return ret;
+    }
+
+    /*
+     * make sure the flowing conditions are met:
+     *  1 the given two datanodes are in healthy state
+     *  2 the given container exists on the given source datanode
+     *  3 the given container does not exist on the given target datanode
+     *  4 the given container is in closed state
+     *  5 the giver container is not taking any inflight action
+     *  6 the given two datanodes are in IN_SERVICE state
+     *
+     * move is a combination of two steps : replication and deletion.
+     * if the conditions above are all met, then we take a conservative
+     * strategy here : replication can always be executed, but the execution
+     * of deletion always depends on placement policy
+     */
+
+    NodeStatus currentNodeStat = nodeManager.getNodeStatus(srcDn);
+    NodeState healthStat = currentNodeStat.getHealth();
+    NodeOperationalState operationalState =
+        currentNodeStat.getOperationalState();
+    if (healthStat != NodeState.HEALTHY) {
+      LOG.info("given source datanode is in {} state, " +
+          "not in HEALTHY state", healthStat);

Review comment:
       We can remove the log and complete future exceptionally or use a MoveResult for the different errors in this function.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -862,16 +1061,55 @@ private void handleOverReplicatedContainer(final ContainerInfo container,
           break;
         }
       }
+
+      eligibleReplicas.removeAll(unhealthyReplicas);
+      boolean isInMove = inflightMove.containsKey(id);
+      boolean isSourceDnInReplicaSet = false;
+      boolean isTargetDnInReplicaSet = false;
+
+      if (isInMove) {
+        Pair<DatanodeDetails, DatanodeDetails> movePair =
+            inflightMove.get(id);
+        final DatanodeDetails sourceDN = movePair.getKey();
+        isSourceDnInReplicaSet = eligibleReplicas.stream()
+            .anyMatch(r -> r.getDatanodeDetails().equals(sourceDN));
+        isTargetDnInReplicaSet = eligibleReplicas.stream()
+            .anyMatch(r -> r.getDatanodeDetails()
+                .equals(movePair.getValue()));
+        int sourceDnPos = 0;
+        for (int i = 0; i < eligibleReplicas.size(); i++) {
+          if (eligibleReplicas.get(i).getDatanodeDetails()
+              .equals(sourceDN)) {
+            sourceDnPos = i;
+            break;
+          }
+        }

Review comment:
       We do not need to calculate `sourceDnPos`. We can simply call.
   `eligibleReplicas.remove(sourceDN)`

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -471,6 +549,127 @@ private void updateInflightAction(final ContainerInfo container,
     }
   }
 
+  /**
+   * add a move action for a given container.
+   *
+   * @param cid Container to move
+   * @param srcDn datanode to move from
+   * @param targetDn datanode to move to
+   */
+  public Optional<CompletableFuture<MoveResult>> move(ContainerID cid,

Review comment:
       I think it would be better to return CompletableFuture always. In the cases where move is not accepted, future can be completed exceptionally with the appropriate exception message. Or we could add more MoveResult for the errors.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -471,6 +549,127 @@ private void updateInflightAction(final ContainerInfo container,
     }
   }
 
+  /**
+   * add a move action for a given container.
+   *
+   * @param cid Container to move
+   * @param srcDn datanode to move from
+   * @param targetDn datanode to move to
+   */
+  public Optional<CompletableFuture<MoveResult>> move(ContainerID cid,
+      DatanodeDetails srcDn, DatanodeDetails targetDn)
+      throws ContainerNotFoundException, NodeNotFoundException {
+    LOG.info("receive a move requset about container {} , from {} to {}",
+        cid, srcDn.getUuid(), targetDn.getUuid());
+    Optional<CompletableFuture<MoveResult>> ret = Optional.empty();
+    if (!isRunning()) {
+      LOG.info("Replication Manager in not running. please start it first");
+      return ret;
+    }
+
+    /*
+     * make sure the flowing conditions are met:
+     *  1 the given two datanodes are in healthy state
+     *  2 the given container exists on the given source datanode
+     *  3 the given container does not exist on the given target datanode
+     *  4 the given container is in closed state
+     *  5 the giver container is not taking any inflight action
+     *  6 the given two datanodes are in IN_SERVICE state
+     *
+     * move is a combination of two steps : replication and deletion.
+     * if the conditions above are all met, then we take a conservative
+     * strategy here : replication can always be executed, but the execution
+     * of deletion always depends on placement policy
+     */
+
+    NodeStatus currentNodeStat = nodeManager.getNodeStatus(srcDn);
+    NodeState healthStat = currentNodeStat.getHealth();
+    NodeOperationalState operationalState =
+        currentNodeStat.getOperationalState();
+    if (healthStat != NodeState.HEALTHY) {
+      LOG.info("given source datanode is in {} state, " +
+          "not in HEALTHY state", healthStat);

Review comment:
       Further I think REPLICATION_FAIL_NODE_UNHEALTHY can be used here.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -458,6 +505,37 @@ private void updateInflightAction(final ContainerInfo container,
           if (health != NodeState.HEALTHY || a.time < deadline
               || filter.test(a)) {
             iter.remove();
+
+            if (inflightMove.containsKey(id)) {
+              boolean isInflightReplication =
+                  inflightActions.equals(inflightReplication);
+              //if replication is completed , nothing to do
+              if (!(isInflightReplication && filter.test(a))) {
+                if (isInflightReplication) {
+                  if (health != NodeState.HEALTHY) {
+                    inflightMoveFuture.get(id).complete(
+                        MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+                  } else {
+                    inflightMoveFuture.get(id).complete(
+                        MoveResult.REPLICATION_FAIL_TIME_OUT);
+                  }
+                } else {
+                  if (health != NodeState.HEALTHY) {
+                    inflightMoveFuture.get(id).complete(
+                        MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+                  } else if (a.time < deadline) {
+                    inflightMoveFuture.get(id).complete(
+                        MoveResult.DELETION_FAIL_TIME_OUT);
+                  } else {
+                    inflightMoveFuture.get(id).complete(
+                        MoveResult.COMPLETED);
+                  }

Review comment:
       I feel we can simplify this code and probably move it to a separate function like.
   `updateMove(boolean isInflightReplication, boolean timeout, boolean success, NodeState health)`
   We will also need to check if a.datanode matches our required datanode.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -471,6 +549,127 @@ private void updateInflightAction(final ContainerInfo container,
     }
   }
 
+  /**
+   * add a move action for a given container.
+   *
+   * @param cid Container to move
+   * @param srcDn datanode to move from
+   * @param targetDn datanode to move to
+   */
+  public Optional<CompletableFuture<MoveResult>> move(ContainerID cid,
+      DatanodeDetails srcDn, DatanodeDetails targetDn)
+      throws ContainerNotFoundException, NodeNotFoundException {
+    LOG.info("receive a move requset about container {} , from {} to {}",
+        cid, srcDn.getUuid(), targetDn.getUuid());

Review comment:
       Let's only log the successful cases.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -862,16 +1061,55 @@ private void handleOverReplicatedContainer(final ContainerInfo container,
           break;
         }
       }
+
+      eligibleReplicas.removeAll(unhealthyReplicas);
+      boolean isInMove = inflightMove.containsKey(id);
+      boolean isSourceDnInReplicaSet = false;
+      boolean isTargetDnInReplicaSet = false;
+
+      if (isInMove) {
+        Pair<DatanodeDetails, DatanodeDetails> movePair =
+            inflightMove.get(id);
+        final DatanodeDetails sourceDN = movePair.getKey();
+        isSourceDnInReplicaSet = eligibleReplicas.stream()
+            .anyMatch(r -> r.getDatanodeDetails().equals(sourceDN));
+        isTargetDnInReplicaSet = eligibleReplicas.stream()
+            .anyMatch(r -> r.getDatanodeDetails()
+                .equals(movePair.getValue()));
+        int sourceDnPos = 0;
+        for (int i = 0; i < eligibleReplicas.size(); i++) {
+          if (eligibleReplicas.get(i).getDatanodeDetails()
+              .equals(sourceDN)) {
+            sourceDnPos = i;
+            break;
+          }
+        }
+        if (isTargetDnInReplicaSet) {
+          // if the container is in inflightMove and target datanode is
+          // included in the replicas, then swap the source datanode to
+          // first of the replica list if exists, so the source datanode
+          // will be first removed if possible.
+          eligibleReplicas.add(0, eligibleReplicas.remove(sourceDnPos));
+        } else {
+          // a container replica that being moved should not be removed.
+          // if the container is in inflightMove and target datanode is not
+          // included in the replicas, then swap the source datanode to the
+          // last of the replica list, so the source datanode will not
+          // be removed.
+          eligibleReplicas.add(eligibleReplicas.remove(sourceDnPos));

Review comment:
       I am not sure we should handle this. TargetDN would receive all the existing replicas as source to copy from. I think that at end if sourceDN is not present and targetDN is present move is successful.
   

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -471,6 +549,127 @@ private void updateInflightAction(final ContainerInfo container,
     }
   }
 
+  /**
+   * add a move action for a given container.
+   *
+   * @param cid Container to move
+   * @param srcDn datanode to move from
+   * @param targetDn datanode to move to
+   */
+  public Optional<CompletableFuture<MoveResult>> move(ContainerID cid,
+      DatanodeDetails srcDn, DatanodeDetails targetDn)
+      throws ContainerNotFoundException, NodeNotFoundException {
+    LOG.info("receive a move requset about container {} , from {} to {}",
+        cid, srcDn.getUuid(), targetDn.getUuid());
+    Optional<CompletableFuture<MoveResult>> ret = Optional.empty();
+    if (!isRunning()) {
+      LOG.info("Replication Manager in not running. please start it first");
+      return ret;
+    }
+
+    /*
+     * make sure the flowing conditions are met:
+     *  1 the given two datanodes are in healthy state
+     *  2 the given container exists on the given source datanode
+     *  3 the given container does not exist on the given target datanode
+     *  4 the given container is in closed state
+     *  5 the giver container is not taking any inflight action
+     *  6 the given two datanodes are in IN_SERVICE state
+     *
+     * move is a combination of two steps : replication and deletion.
+     * if the conditions above are all met, then we take a conservative
+     * strategy here : replication can always be executed, but the execution
+     * of deletion always depends on placement policy
+     */
+
+    NodeStatus currentNodeStat = nodeManager.getNodeStatus(srcDn);
+    NodeState healthStat = currentNodeStat.getHealth();
+    NodeOperationalState operationalState =
+        currentNodeStat.getOperationalState();
+    if (healthStat != NodeState.HEALTHY) {
+      LOG.info("given source datanode is in {} state, " +
+          "not in HEALTHY state", healthStat);
+      return ret;
+    }
+    if (operationalState != NodeOperationalState.IN_SERVICE) {
+      LOG.info("given source datanode is in {} state, " +
+          "not in IN_SERVICE state", operationalState);
+      return ret;
+    }
+
+    currentNodeStat = nodeManager.getNodeStatus(targetDn);
+    healthStat = currentNodeStat.getHealth();
+    operationalState = currentNodeStat.getOperationalState();
+    if (healthStat != NodeState.HEALTHY) {
+      LOG.info("given target datanode is in {} state, " +
+          "not in HEALTHY state", healthStat);
+      return ret;

Review comment:
       DELETION_FAIL_NODE_UNHEALTHY can be used here.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -471,6 +549,127 @@ private void updateInflightAction(final ContainerInfo container,
     }
   }
 
+  /**
+   * add a move action for a given container.
+   *
+   * @param cid Container to move
+   * @param srcDn datanode to move from
+   * @param targetDn datanode to move to
+   */
+  public Optional<CompletableFuture<MoveResult>> move(ContainerID cid,
+      DatanodeDetails srcDn, DatanodeDetails targetDn)
+      throws ContainerNotFoundException, NodeNotFoundException {
+    LOG.info("receive a move requset about container {} , from {} to {}",
+        cid, srcDn.getUuid(), targetDn.getUuid());
+    Optional<CompletableFuture<MoveResult>> ret = Optional.empty();
+    if (!isRunning()) {
+      LOG.info("Replication Manager in not running. please start it first");
+      return ret;
+    }
+
+    /*
+     * make sure the flowing conditions are met:
+     *  1 the given two datanodes are in healthy state
+     *  2 the given container exists on the given source datanode
+     *  3 the given container does not exist on the given target datanode
+     *  4 the given container is in closed state
+     *  5 the giver container is not taking any inflight action
+     *  6 the given two datanodes are in IN_SERVICE state
+     *
+     * move is a combination of two steps : replication and deletion.
+     * if the conditions above are all met, then we take a conservative
+     * strategy here : replication can always be executed, but the execution
+     * of deletion always depends on placement policy
+     */
+
+    NodeStatus currentNodeStat = nodeManager.getNodeStatus(srcDn);
+    NodeState healthStat = currentNodeStat.getHealth();
+    NodeOperationalState operationalState =
+        currentNodeStat.getOperationalState();
+    if (healthStat != NodeState.HEALTHY) {
+      LOG.info("given source datanode is in {} state, " +
+          "not in HEALTHY state", healthStat);
+      return ret;
+    }
+    if (operationalState != NodeOperationalState.IN_SERVICE) {
+      LOG.info("given source datanode is in {} state, " +
+          "not in IN_SERVICE state", operationalState);
+      return ret;
+    }
+
+    currentNodeStat = nodeManager.getNodeStatus(targetDn);
+    healthStat = currentNodeStat.getHealth();
+    operationalState = currentNodeStat.getOperationalState();
+    if (healthStat != NodeState.HEALTHY) {
+      LOG.info("given target datanode is in {} state, " +
+          "not in HEALTHY state", healthStat);
+      return ret;
+    }
+    if (operationalState != NodeOperationalState.IN_SERVICE) {
+      LOG.info("given target datanode is in {} state, " +
+          "not in IN_SERVICE state", operationalState);
+      return ret;
+    }
+
+    // we need to synchronize on ContainerInfo, since it is
+    // shared by ICR/FCR handler and this.processContainer
+    // TODO: use a Read lock after introducing a RW lock into ContainerInfo
+    ContainerInfo cif = containerManager.getContainer(cid);
+    synchronized (cif) {
+      final Set<DatanodeDetails> replicas = containerManager
+            .getContainerReplicas(cid).stream()
+            .map(ContainerReplica::getDatanodeDetails)
+            .collect(Collectors.toSet());
+      if (replicas.contains(targetDn)) {
+        LOG.info("given container exists in the target Datanode");
+        return ret;
+      }
+      if (!replicas.contains(srcDn)) {
+        LOG.info("given container does not exist in the source Datanode");
+        return ret;
+      }
+
+      /*
+      * the reason why the given container should not be taking any inflight
+      * action is that: if the given container is being replicated or deleted,
+      * the num of its replica is not deterministic, so move operation issued
+      * by balancer may cause a nondeterministic result, so we should drop
+      * this option for this time.
+      * */
+
+      if (inflightReplication.containsKey(cid)) {
+        LOG.info("given container is in inflight replication");
+        return ret;
+      }
+      if (inflightDeletion.containsKey(cid)) {
+        LOG.info("given container is in inflight deletion");
+        return ret;
+      }
+
+      /*
+      * here, no need to see whether cid is in inflightMove, because
+      * these three map are all synchronized on ContainerInfo, if cid
+      * is in infligtMove , it must now being replicated or deleted,
+      * so it must be in inflightReplication or in infligthDeletion.
+      * thus, if we can not find cid in both of them , this cid must
+      * not be in inflightMove.
+      */
+

Review comment:
       We should also check that {Existing replicas + Target_Dn - Source_Dn} satisfies the placement policy.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
##########
@@ -862,16 +1061,55 @@ private void handleOverReplicatedContainer(final ContainerInfo container,
           break;
         }
       }
+
+      eligibleReplicas.removeAll(unhealthyReplicas);
+      boolean isInMove = inflightMove.containsKey(id);
+      boolean isSourceDnInReplicaSet = false;
+      boolean isTargetDnInReplicaSet = false;
+
+      if (isInMove) {
+        Pair<DatanodeDetails, DatanodeDetails> movePair =
+            inflightMove.get(id);
+        final DatanodeDetails sourceDN = movePair.getKey();
+        isSourceDnInReplicaSet = eligibleReplicas.stream()
+            .anyMatch(r -> r.getDatanodeDetails().equals(sourceDN));
+        isTargetDnInReplicaSet = eligibleReplicas.stream()
+            .anyMatch(r -> r.getDatanodeDetails()
+                .equals(movePair.getValue()));
+        int sourceDnPos = 0;
+        for (int i = 0; i < eligibleReplicas.size(); i++) {
+          if (eligibleReplicas.get(i).getDatanodeDetails()
+              .equals(sourceDN)) {
+            sourceDnPos = i;
+            break;
+          }
+        }
+        if (isTargetDnInReplicaSet) {
+          // if the container is in inflightMove and target datanode is
+          // included in the replicas, then swap the source datanode to
+          // first of the replica list if exists, so the source datanode
+          // will be first removed if possible.
+          eligibleReplicas.add(0, eligibleReplicas.remove(sourceDnPos));

Review comment:
       We should also handle the case when sourceDN is not present in eligibleReplicas.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org