You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by "sodonnel (via GitHub)" <gi...@apache.org> on 2023/02/14 18:46:26 UTC

[GitHub] [ozone] sodonnel opened a new pull request, #4274: Added moveManager and some RM helper methods

sodonnel opened a new pull request, #4274:
URL: https://github.com/apache/ozone/pull/4274

   ## What changes were proposed in this pull request?
   
   To allow the balancer to be split from the Legacy ReplicationManager, this PR adds a MoveManager class. The Balancer will container a reference to an instance of this move manager, and use it to schedule its moves rather than calling the replication manager directly.
   
   As things stand, this Move Manager is not yet used, this PR simply adds it along with some tests, with a future PR planned to make the balancer start to use it.
   
   The MoveManager will keep track of the scheduled moves, and when the "add" part of a move completes, it will be notified by the ContainerReplicaPendingOps service, allowing it to schedule the delete part of the move if still appropriate.
   
   In order to play nicely with the command deadlines introduced into the new RM, and also the load limiting which is yet to be added to the new RM, the MoveManger schedules commands with "low" priority on the datanodes and also with a longer deadline. This allows the balancer to schedule large batches of work on the DNs with large deadlines, while not blocking replication commands related to decommission or under replication. Replication commands for under replication are scheduled with Normal priority, meaning they are processed before balancer commands.
   
   Recent changes building up to this change have resulted in several things which avoid the need for the Balancer / Move Manager to replicate its work across SCM in the case of failover:
   
   1. Commands are more robustly expired on the DNs in the event of an SCM Term (leader) change.
   2. Replicate and Delete commands now have deadlines so they are expired on the DNs if they take too long to process.
   
   We also plan to ensure the balancer / move manager does not run for a short delay after a failover, similar to how Replication Manager works.
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-6572
   
   ## How was this patch tested?
   
   Additional tests still needed added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a diff in pull request #4274: HDDS-6572. EC: ReplicationManager - add move manager for container move

Posted by "sodonnel (via GitHub)" <gi...@apache.org>.
sodonnel commented on code in PR #4274:
URL: https://github.com/apache/ozone/pull/4274#discussion_r1108777799


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManagerImpl.java:
##########
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManagerImpl implements MoveManager,
+    ContainerReplicaPendingOpsSubscriber {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManagerImpl.class);
+
+  // TODO - Delay moves until some time after failover
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+  private final Map<ContainerID, CompletableFuture<MoveResult>>
+      pendingMoveFuture = new ConcurrentHashMap<>();
+  private final Map<ContainerID, MoveDataNodePair>
+      pendingMoveOps = new ConcurrentHashMap<>();
+  private volatile boolean running = false;
+
+  public MoveManagerImpl(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager)
+      throws IOException {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID, MoveDataNodePair> getPendingMove() {
+    return pendingMoveOps;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    if (pendingMoveOps.containsKey(cid)) {
+      CompletableFuture<MoveResult> future = pendingMoveFuture.remove(cid);
+      pendingMoveOps.remove(cid);
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoveOps.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoveOps.putIfAbsent(containerInfo.containerID(), mp);
+      pendingMoveFuture.putIfAbsent(containerInfo.containerID(), ret);
+    }
+  }
+
+  /**
+   * notify MoveManager that the current scm has become leader and ready.
+   */
+  @Override
+  public void onLeaderReady() {
+    //discard all stale records
+    pendingMoveOps.clear();
+    pendingMoveFuture.clear();
+    running = true;
+  }
+
+  /**
+   * notify MoveManager that the current scm leader steps down.
+   */
+  @Override
+  public void onNotLeader() {
+    running = false;
+  }
+
+  /**
+   * move a container replica from source datanode to
+   * target datanode. A move is a two part operation. First a replication
+   * command is scheduled to create a new copy of the replica. Later, when the
+   * replication completes a delete is scheduled to remove the original replica.
+   *
+   * @param cid Container to move
+   * @param src source datanode
+   * @param tgt target datanode
+   */
+  @Override
+  public CompletableFuture<MoveResult> move(
+      ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
+      throws ContainerNotFoundException, NodeNotFoundException,
+      TimeoutException, ContainerReplicaNotFoundException {
+    CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+
+    if (!running) {
+      ret.complete(MoveResult.FAIL_LEADER_NOT_READY);
+      return ret;
+    }
+
+    // Ensure src and tgt are IN_SERVICE and HEALTHY
+    for (DatanodeDetails dn : Arrays.asList(src, tgt)) {
+      NodeStatus currentNodeStatus = replicationManager.getNodeStatus(dn);
+      if (currentNodeStatus.getHealth() != HddsProtos.NodeState.HEALTHY) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+        return ret;
+      }
+      if (currentNodeStatus.getOperationalState()
+          != HddsProtos.NodeOperationalState.IN_SERVICE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+        return ret;
+      }
+    }
+
+    // Ensure the container exists on the src and is not present on the target
+    ContainerInfo containerInfo = containerManager.getContainer(cid);
+    final Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+
+    boolean srcExists = false;
+    for (ContainerReplica r : currentReplicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcExists = true;
+      }
+      if (r.getDatanodeDetails().equals(tgt)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+        return ret;
+      }
+    }
+    if (!srcExists) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+      return ret;
+    }
+
+    /*
+     * Ensure the container has no inflight actions.
+     * The reason why the given container should not be taking any inflight
+     * action is that: if the given container is being replicated or deleted,
+     * the num of its replica is not deterministic, so move operation issued
+     * by balancer may cause a nondeterministic result, so we should drop
+     * this option for this time.
+     */
+    List<ContainerReplicaOp> pendingOps =
+        replicationManager.getPendingReplicationOps(cid);
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+        return ret;
+      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+        return ret;
+      }
+    }
+
+    // Ensure the container is CLOSED
+    HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
+    if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
+      ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+      return ret;
+    }
+
+    // Create a set or replicas that indicates how the container will look
+    // after the move and ensure it is healthy - ie not under, over or mis
+    // replicated.
+    Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
+        containerInfo, src, tgt, currentReplicas);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, replicasAfterMove);
+    if (healthResult.getHealthState()
+        != ContainerHealthResult.HealthState.HEALTHY) {
+      ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+      return ret;
+    }
+    startMove(containerInfo, src, tgt, ret);
+    LOG.debug("Processed a move request for container {}, from {} to {}",
+        cid, src.getUuid(), tgt.getUuid());
+    return ret;
+  }
+
+  /**
+   * Notify Move Manager that a container op has been completed.
+   *
+   * @param containerReplicaOp ContainerReplicaOp which has completed
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        if (pendingMoveFuture.containsKey(containerID)) {
+          try {
+            handleSuccessfulAdd(containerID);
+          } catch (ContainerNotFoundException | NodeNotFoundException |
+                   ContainerReplicaNotFoundException e) {
+            LOG.warn("Can not handle successful Add for move", e);
+          }
+        } else {
+          LOG.warn("No matching entry found in pendingMoveFuture for " +
+              "containerID {}. Should not happen", containerID);
+          completeMove(containerID, null);
+        }
+      } else if (
+          opType.equals(PendingOpType.DELETE) && mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.COMPLETED);
+      }
+    }
+  }
+
+   /**
+   * Notify Move Manager that a container op has been Expired.
+   *
+   * @param containerReplicaOp ContainerReplicaOp
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpExpired(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        completeMove(containerID, MoveResult.REPLICATION_FAIL_TIME_OUT);
+      } else if (opType.equals(PendingOpType.DELETE) &&
+          mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.DELETION_FAIL_TIME_OUT);
+      }
+    }
+  }
+
+  private void handleSuccessfulAdd(final ContainerID cid)
+      throws ContainerNotFoundException,
+      ContainerReplicaNotFoundException, NodeNotFoundException {
+    MoveDataNodePair movePair = pendingMoveOps.get(cid);
+    if (movePair == null) {
+      return;
+    }
+    final DatanodeDetails src = movePair.getSrc();
+    Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+    if (currentReplicas.stream()
+        .noneMatch(r -> r.getDatanodeDetails().equals(src))) {
+      // if the target is present but source disappears somehow,
+      // we can consider move is successful.
+      completeMove(cid, MoveResult.COMPLETED);
+      return;
+    }
+
+    final NodeStatus nodeStatus = replicationManager.getNodeStatus(src);
+    if (nodeStatus.getOperationalState()
+        != HddsProtos.NodeOperationalState.IN_SERVICE) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
+      return;
+    }
+    if (!nodeStatus.isHealthy()) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+      return;
+    }
+
+    Set<ContainerReplica> futureReplicas = currentReplicas.stream()
+        .filter(r -> !r.getDatanodeDetails().equals(src))
+        .collect(Collectors.toSet());
+
+    final ContainerInfo containerInfo = containerManager.getContainer(cid);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, futureReplicas);
+
+    if (healthResult.getHealthState() ==
+        ContainerHealthResult.HealthState.HEALTHY) {
+      sendDeleteCommand(containerInfo, src);
+    } else {
+      LOG.info("Cannot remove source replica as the container health would " +
+          " be {}", healthResult.getHealthState());
+      completeMove(cid, MoveResult.DELETE_FAIL_POLICY);
+    }
+  }
+
+  private Set<ContainerReplica> createReplicaSetAfterMove(
+      ContainerInfo containerInfo, DatanodeDetails src,
+      DatanodeDetails tgt, Set<ContainerReplica> existing) {
+    Set<ContainerReplica> replicas = new HashSet<>(existing);
+    ContainerReplica srcReplica = null;
+    for (ContainerReplica r : replicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcReplica = r;
+        break;
+      }
+    }
+    if (srcReplica == null) {
+      throw new RuntimeException("The source replica is not present");

Review Comment:
   Good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a diff in pull request #4274: HDDS-6572. EC: ReplicationManager - add move manager for container move

Posted by "sodonnel (via GitHub)" <gi...@apache.org>.
sodonnel commented on code in PR #4274:
URL: https://github.com/apache/ozone/pull/4274#discussion_r1108761857


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManagerImpl.java:
##########
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManagerImpl implements MoveManager,
+    ContainerReplicaPendingOpsSubscriber {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManagerImpl.class);
+
+  // TODO - Delay moves until some time after failover
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+  private final Map<ContainerID, CompletableFuture<MoveResult>>
+      pendingMoveFuture = new ConcurrentHashMap<>();
+  private final Map<ContainerID, MoveDataNodePair>
+      pendingMoveOps = new ConcurrentHashMap<>();

Review Comment:
   OK - I have changed this to a single map of Pair<>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai merged pull request #4274: HDDS-6572. EC: ReplicationManager - add move manager for container move

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai merged PR #4274:
URL: https://github.com/apache/ozone/pull/4274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a diff in pull request #4274: HDDS-6572. EC: ReplicationManager - add move manager for container move

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai commented on code in PR #4274:
URL: https://github.com/apache/ozone/pull/4274#discussion_r1107383691


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManagerImpl.java:
##########
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManagerImpl implements MoveManager,
+    ContainerReplicaPendingOpsSubscriber {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManagerImpl.class);
+
+  // TODO - Delay moves until some time after failover
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+  private final Map<ContainerID, CompletableFuture<MoveResult>>
+      pendingMoveFuture = new ConcurrentHashMap<>();
+  private final Map<ContainerID, MoveDataNodePair>
+      pendingMoveOps = new ConcurrentHashMap<>();

Review Comment:
   Should we use a single `ConcurrentHashMap` with combined value instead of two separate ones to avoid race conditions?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManagerImpl.java:
##########
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManagerImpl implements MoveManager,
+    ContainerReplicaPendingOpsSubscriber {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManagerImpl.class);
+
+  // TODO - Delay moves until some time after failover
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+  private final Map<ContainerID, CompletableFuture<MoveResult>>
+      pendingMoveFuture = new ConcurrentHashMap<>();
+  private final Map<ContainerID, MoveDataNodePair>
+      pendingMoveOps = new ConcurrentHashMap<>();
+  private volatile boolean running = false;
+
+  public MoveManagerImpl(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager)
+      throws IOException {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID, MoveDataNodePair> getPendingMove() {
+    return pendingMoveOps;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    if (pendingMoveOps.containsKey(cid)) {
+      CompletableFuture<MoveResult> future = pendingMoveFuture.remove(cid);
+      pendingMoveOps.remove(cid);
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoveOps.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoveOps.putIfAbsent(containerInfo.containerID(), mp);
+      pendingMoveFuture.putIfAbsent(containerInfo.containerID(), ret);
+    }
+  }
+
+  /**
+   * notify MoveManager that the current scm has become leader and ready.
+   */
+  @Override
+  public void onLeaderReady() {
+    //discard all stale records
+    pendingMoveOps.clear();
+    pendingMoveFuture.clear();
+    running = true;
+  }
+
+  /**
+   * notify MoveManager that the current scm leader steps down.
+   */
+  @Override
+  public void onNotLeader() {
+    running = false;
+  }
+
+  /**
+   * move a container replica from source datanode to
+   * target datanode. A move is a two part operation. First a replication
+   * command is scheduled to create a new copy of the replica. Later, when the
+   * replication completes a delete is scheduled to remove the original replica.
+   *
+   * @param cid Container to move
+   * @param src source datanode
+   * @param tgt target datanode
+   */
+  @Override
+  public CompletableFuture<MoveResult> move(
+      ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
+      throws ContainerNotFoundException, NodeNotFoundException,
+      TimeoutException, ContainerReplicaNotFoundException {
+    CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+
+    if (!running) {
+      ret.complete(MoveResult.FAIL_LEADER_NOT_READY);
+      return ret;
+    }
+
+    // Ensure src and tgt are IN_SERVICE and HEALTHY
+    for (DatanodeDetails dn : Arrays.asList(src, tgt)) {
+      NodeStatus currentNodeStatus = replicationManager.getNodeStatus(dn);
+      if (currentNodeStatus.getHealth() != HddsProtos.NodeState.HEALTHY) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+        return ret;
+      }
+      if (currentNodeStatus.getOperationalState()
+          != HddsProtos.NodeOperationalState.IN_SERVICE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+        return ret;
+      }
+    }
+
+    // Ensure the container exists on the src and is not present on the target
+    ContainerInfo containerInfo = containerManager.getContainer(cid);
+    final Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+
+    boolean srcExists = false;
+    for (ContainerReplica r : currentReplicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcExists = true;
+      }
+      if (r.getDatanodeDetails().equals(tgt)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+        return ret;
+      }
+    }
+    if (!srcExists) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+      return ret;
+    }
+
+    /*
+     * Ensure the container has no inflight actions.
+     * The reason why the given container should not be taking any inflight
+     * action is that: if the given container is being replicated or deleted,
+     * the num of its replica is not deterministic, so move operation issued
+     * by balancer may cause a nondeterministic result, so we should drop
+     * this option for this time.
+     */
+    List<ContainerReplicaOp> pendingOps =
+        replicationManager.getPendingReplicationOps(cid);
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+        return ret;
+      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+        return ret;
+      }
+    }
+
+    // Ensure the container is CLOSED
+    HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
+    if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
+      ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+      return ret;
+    }
+
+    // Create a set or replicas that indicates how the container will look
+    // after the move and ensure it is healthy - ie not under, over or mis
+    // replicated.
+    Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
+        containerInfo, src, tgt, currentReplicas);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, replicasAfterMove);
+    if (healthResult.getHealthState()
+        != ContainerHealthResult.HealthState.HEALTHY) {
+      ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+      return ret;
+    }
+    startMove(containerInfo, src, tgt, ret);
+    LOG.debug("Processed a move request for container {}, from {} to {}",
+        cid, src.getUuid(), tgt.getUuid());
+    return ret;
+  }
+
+  /**
+   * Notify Move Manager that a container op has been completed.
+   *
+   * @param containerReplicaOp ContainerReplicaOp which has completed
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        if (pendingMoveFuture.containsKey(containerID)) {
+          try {
+            handleSuccessfulAdd(containerID);
+          } catch (ContainerNotFoundException | NodeNotFoundException |
+                   ContainerReplicaNotFoundException e) {
+            LOG.warn("Can not handle successful Add for move", e);
+          }
+        } else {
+          LOG.warn("No matching entry found in pendingMoveFuture for " +
+              "containerID {}. Should not happen", containerID);
+          completeMove(containerID, null);
+        }
+      } else if (
+          opType.equals(PendingOpType.DELETE) && mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.COMPLETED);
+      }
+    }
+  }
+
+   /**
+   * Notify Move Manager that a container op has been Expired.
+   *
+   * @param containerReplicaOp ContainerReplicaOp
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpExpired(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        completeMove(containerID, MoveResult.REPLICATION_FAIL_TIME_OUT);
+      } else if (opType.equals(PendingOpType.DELETE) &&
+          mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.DELETION_FAIL_TIME_OUT);
+      }
+    }
+  }
+
+  private void handleSuccessfulAdd(final ContainerID cid)
+      throws ContainerNotFoundException,
+      ContainerReplicaNotFoundException, NodeNotFoundException {
+    MoveDataNodePair movePair = pendingMoveOps.get(cid);
+    if (movePair == null) {
+      return;
+    }
+    final DatanodeDetails src = movePair.getSrc();
+    Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+    if (currentReplicas.stream()
+        .noneMatch(r -> r.getDatanodeDetails().equals(src))) {
+      // if the target is present but source disappears somehow,
+      // we can consider move is successful.
+      completeMove(cid, MoveResult.COMPLETED);
+      return;
+    }
+
+    final NodeStatus nodeStatus = replicationManager.getNodeStatus(src);
+    if (nodeStatus.getOperationalState()
+        != HddsProtos.NodeOperationalState.IN_SERVICE) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
+      return;
+    }
+    if (!nodeStatus.isHealthy()) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+      return;
+    }
+
+    Set<ContainerReplica> futureReplicas = currentReplicas.stream()
+        .filter(r -> !r.getDatanodeDetails().equals(src))
+        .collect(Collectors.toSet());
+
+    final ContainerInfo containerInfo = containerManager.getContainer(cid);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, futureReplicas);
+
+    if (healthResult.getHealthState() ==
+        ContainerHealthResult.HealthState.HEALTHY) {
+      sendDeleteCommand(containerInfo, src);
+    } else {
+      LOG.info("Cannot remove source replica as the container health would " +
+          " be {}", healthResult.getHealthState());
+      completeMove(cid, MoveResult.DELETE_FAIL_POLICY);
+    }
+  }
+
+  private Set<ContainerReplica> createReplicaSetAfterMove(
+      ContainerInfo containerInfo, DatanodeDetails src,
+      DatanodeDetails tgt, Set<ContainerReplica> existing) {
+    Set<ContainerReplica> replicas = new HashSet<>(existing);
+    ContainerReplica srcReplica = null;
+    for (ContainerReplica r : replicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcReplica = r;
+        break;
+      }
+    }
+    if (srcReplica == null) {
+      throw new RuntimeException("The source replica is not present");
+    }
+    replicas.remove(srcReplica);
+    replicas.add(ContainerReplica.newBuilder()
+        .setDatanodeDetails(tgt)
+        .setContainerID(containerInfo.containerID())
+        .setContainerState(StorageContainerDatanodeProtocolProtos
+            .ContainerReplicaProto.State.CLOSED)
+        .setOriginNodeId(srcReplica.getOriginDatanodeId())
+        .setSequenceId(srcReplica.getSequenceId())
+        .setBytesUsed(srcReplica.getBytesUsed())
+        .setKeyCount(srcReplica.getKeyCount())
+        .setReplicaIndex(srcReplica.getReplicaIndex())

Review Comment:
   If you update the patch for other reasons, these lines can be omitted by using the new `srcReplica.toBuilder()` method.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManagerImpl.java:
##########
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManagerImpl implements MoveManager,
+    ContainerReplicaPendingOpsSubscriber {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManagerImpl.class);
+
+  // TODO - Delay moves until some time after failover
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+  private final Map<ContainerID, CompletableFuture<MoveResult>>
+      pendingMoveFuture = new ConcurrentHashMap<>();
+  private final Map<ContainerID, MoveDataNodePair>
+      pendingMoveOps = new ConcurrentHashMap<>();
+  private volatile boolean running = false;
+
+  public MoveManagerImpl(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager)
+      throws IOException {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID, MoveDataNodePair> getPendingMove() {
+    return pendingMoveOps;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    if (pendingMoveOps.containsKey(cid)) {
+      CompletableFuture<MoveResult> future = pendingMoveFuture.remove(cid);
+      pendingMoveOps.remove(cid);
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoveOps.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoveOps.putIfAbsent(containerInfo.containerID(), mp);
+      pendingMoveFuture.putIfAbsent(containerInfo.containerID(), ret);
+    }
+  }
+
+  /**
+   * notify MoveManager that the current scm has become leader and ready.
+   */
+  @Override
+  public void onLeaderReady() {
+    //discard all stale records
+    pendingMoveOps.clear();
+    pendingMoveFuture.clear();
+    running = true;
+  }
+
+  /**
+   * notify MoveManager that the current scm leader steps down.
+   */
+  @Override
+  public void onNotLeader() {
+    running = false;
+  }
+
+  /**
+   * move a container replica from source datanode to
+   * target datanode. A move is a two part operation. First a replication
+   * command is scheduled to create a new copy of the replica. Later, when the
+   * replication completes a delete is scheduled to remove the original replica.
+   *
+   * @param cid Container to move
+   * @param src source datanode
+   * @param tgt target datanode
+   */
+  @Override
+  public CompletableFuture<MoveResult> move(
+      ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
+      throws ContainerNotFoundException, NodeNotFoundException,
+      TimeoutException, ContainerReplicaNotFoundException {
+    CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+
+    if (!running) {
+      ret.complete(MoveResult.FAIL_LEADER_NOT_READY);
+      return ret;
+    }
+
+    // Ensure src and tgt are IN_SERVICE and HEALTHY
+    for (DatanodeDetails dn : Arrays.asList(src, tgt)) {
+      NodeStatus currentNodeStatus = replicationManager.getNodeStatus(dn);
+      if (currentNodeStatus.getHealth() != HddsProtos.NodeState.HEALTHY) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+        return ret;
+      }
+      if (currentNodeStatus.getOperationalState()
+          != HddsProtos.NodeOperationalState.IN_SERVICE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+        return ret;
+      }
+    }
+
+    // Ensure the container exists on the src and is not present on the target
+    ContainerInfo containerInfo = containerManager.getContainer(cid);
+    final Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+
+    boolean srcExists = false;
+    for (ContainerReplica r : currentReplicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcExists = true;
+      }
+      if (r.getDatanodeDetails().equals(tgt)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+        return ret;
+      }
+    }
+    if (!srcExists) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+      return ret;
+    }
+
+    /*
+     * Ensure the container has no inflight actions.
+     * The reason why the given container should not be taking any inflight
+     * action is that: if the given container is being replicated or deleted,
+     * the num of its replica is not deterministic, so move operation issued
+     * by balancer may cause a nondeterministic result, so we should drop
+     * this option for this time.
+     */
+    List<ContainerReplicaOp> pendingOps =
+        replicationManager.getPendingReplicationOps(cid);
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+        return ret;
+      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+        return ret;
+      }
+    }
+
+    // Ensure the container is CLOSED
+    HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
+    if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
+      ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+      return ret;
+    }
+
+    // Create a set or replicas that indicates how the container will look
+    // after the move and ensure it is healthy - ie not under, over or mis
+    // replicated.
+    Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
+        containerInfo, src, tgt, currentReplicas);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, replicasAfterMove);
+    if (healthResult.getHealthState()
+        != ContainerHealthResult.HealthState.HEALTHY) {
+      ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+      return ret;
+    }
+    startMove(containerInfo, src, tgt, ret);
+    LOG.debug("Processed a move request for container {}, from {} to {}",
+        cid, src.getUuid(), tgt.getUuid());

Review Comment:
   Nit: 
   ```suggestion
           cid, src.getUuidString(), tgt.getUuidString());
   ```



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManagerImpl.java:
##########
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManagerImpl implements MoveManager,
+    ContainerReplicaPendingOpsSubscriber {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManagerImpl.class);
+
+  // TODO - Delay moves until some time after failover
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+  private final Map<ContainerID, CompletableFuture<MoveResult>>
+      pendingMoveFuture = new ConcurrentHashMap<>();
+  private final Map<ContainerID, MoveDataNodePair>
+      pendingMoveOps = new ConcurrentHashMap<>();
+  private volatile boolean running = false;
+
+  public MoveManagerImpl(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager)
+      throws IOException {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID, MoveDataNodePair> getPendingMove() {
+    return pendingMoveOps;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    if (pendingMoveOps.containsKey(cid)) {
+      CompletableFuture<MoveResult> future = pendingMoveFuture.remove(cid);
+      pendingMoveOps.remove(cid);
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoveOps.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoveOps.putIfAbsent(containerInfo.containerID(), mp);
+      pendingMoveFuture.putIfAbsent(containerInfo.containerID(), ret);
+    }
+  }
+
+  /**
+   * notify MoveManager that the current scm has become leader and ready.
+   */
+  @Override
+  public void onLeaderReady() {
+    //discard all stale records
+    pendingMoveOps.clear();
+    pendingMoveFuture.clear();
+    running = true;
+  }
+
+  /**
+   * notify MoveManager that the current scm leader steps down.
+   */
+  @Override
+  public void onNotLeader() {
+    running = false;
+  }
+
+  /**
+   * move a container replica from source datanode to
+   * target datanode. A move is a two part operation. First a replication
+   * command is scheduled to create a new copy of the replica. Later, when the
+   * replication completes a delete is scheduled to remove the original replica.
+   *
+   * @param cid Container to move
+   * @param src source datanode
+   * @param tgt target datanode
+   */
+  @Override
+  public CompletableFuture<MoveResult> move(
+      ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
+      throws ContainerNotFoundException, NodeNotFoundException,
+      TimeoutException, ContainerReplicaNotFoundException {
+    CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+
+    if (!running) {
+      ret.complete(MoveResult.FAIL_LEADER_NOT_READY);
+      return ret;
+    }
+
+    // Ensure src and tgt are IN_SERVICE and HEALTHY
+    for (DatanodeDetails dn : Arrays.asList(src, tgt)) {
+      NodeStatus currentNodeStatus = replicationManager.getNodeStatus(dn);
+      if (currentNodeStatus.getHealth() != HddsProtos.NodeState.HEALTHY) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+        return ret;
+      }
+      if (currentNodeStatus.getOperationalState()
+          != HddsProtos.NodeOperationalState.IN_SERVICE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+        return ret;
+      }
+    }
+
+    // Ensure the container exists on the src and is not present on the target
+    ContainerInfo containerInfo = containerManager.getContainer(cid);
+    final Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+
+    boolean srcExists = false;
+    for (ContainerReplica r : currentReplicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcExists = true;
+      }
+      if (r.getDatanodeDetails().equals(tgt)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+        return ret;
+      }
+    }
+    if (!srcExists) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+      return ret;
+    }
+
+    /*
+     * Ensure the container has no inflight actions.
+     * The reason why the given container should not be taking any inflight
+     * action is that: if the given container is being replicated or deleted,
+     * the num of its replica is not deterministic, so move operation issued
+     * by balancer may cause a nondeterministic result, so we should drop
+     * this option for this time.
+     */
+    List<ContainerReplicaOp> pendingOps =
+        replicationManager.getPendingReplicationOps(cid);
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+        return ret;
+      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+        return ret;
+      }
+    }
+
+    // Ensure the container is CLOSED
+    HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
+    if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
+      ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+      return ret;
+    }
+
+    // Create a set or replicas that indicates how the container will look
+    // after the move and ensure it is healthy - ie not under, over or mis
+    // replicated.
+    Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
+        containerInfo, src, tgt, currentReplicas);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, replicasAfterMove);
+    if (healthResult.getHealthState()
+        != ContainerHealthResult.HealthState.HEALTHY) {
+      ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+      return ret;
+    }
+    startMove(containerInfo, src, tgt, ret);
+    LOG.debug("Processed a move request for container {}, from {} to {}",
+        cid, src.getUuid(), tgt.getUuid());
+    return ret;
+  }
+
+  /**
+   * Notify Move Manager that a container op has been completed.
+   *
+   * @param containerReplicaOp ContainerReplicaOp which has completed
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        if (pendingMoveFuture.containsKey(containerID)) {
+          try {
+            handleSuccessfulAdd(containerID);
+          } catch (ContainerNotFoundException | NodeNotFoundException |
+                   ContainerReplicaNotFoundException e) {
+            LOG.warn("Can not handle successful Add for move", e);
+          }
+        } else {
+          LOG.warn("No matching entry found in pendingMoveFuture for " +
+              "containerID {}. Should not happen", containerID);
+          completeMove(containerID, null);
+        }
+      } else if (
+          opType.equals(PendingOpType.DELETE) && mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.COMPLETED);
+      }
+    }
+  }
+
+   /**
+   * Notify Move Manager that a container op has been Expired.
+   *
+   * @param containerReplicaOp ContainerReplicaOp
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpExpired(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        completeMove(containerID, MoveResult.REPLICATION_FAIL_TIME_OUT);
+      } else if (opType.equals(PendingOpType.DELETE) &&
+          mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.DELETION_FAIL_TIME_OUT);
+      }
+    }
+  }
+
+  private void handleSuccessfulAdd(final ContainerID cid)
+      throws ContainerNotFoundException,
+      ContainerReplicaNotFoundException, NodeNotFoundException {
+    MoveDataNodePair movePair = pendingMoveOps.get(cid);
+    if (movePair == null) {
+      return;
+    }
+    final DatanodeDetails src = movePair.getSrc();
+    Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+    if (currentReplicas.stream()
+        .noneMatch(r -> r.getDatanodeDetails().equals(src))) {
+      // if the target is present but source disappears somehow,
+      // we can consider move is successful.
+      completeMove(cid, MoveResult.COMPLETED);
+      return;
+    }
+
+    final NodeStatus nodeStatus = replicationManager.getNodeStatus(src);
+    if (nodeStatus.getOperationalState()
+        != HddsProtos.NodeOperationalState.IN_SERVICE) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
+      return;
+    }
+    if (!nodeStatus.isHealthy()) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+      return;
+    }
+
+    Set<ContainerReplica> futureReplicas = currentReplicas.stream()
+        .filter(r -> !r.getDatanodeDetails().equals(src))
+        .collect(Collectors.toSet());
+
+    final ContainerInfo containerInfo = containerManager.getContainer(cid);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, futureReplicas);
+
+    if (healthResult.getHealthState() ==
+        ContainerHealthResult.HealthState.HEALTHY) {
+      sendDeleteCommand(containerInfo, src);
+    } else {
+      LOG.info("Cannot remove source replica as the container health would " +
+          " be {}", healthResult.getHealthState());
+      completeMove(cid, MoveResult.DELETE_FAIL_POLICY);
+    }
+  }
+
+  private Set<ContainerReplica> createReplicaSetAfterMove(
+      ContainerInfo containerInfo, DatanodeDetails src,
+      DatanodeDetails tgt, Set<ContainerReplica> existing) {
+    Set<ContainerReplica> replicas = new HashSet<>(existing);
+    ContainerReplica srcReplica = null;
+    for (ContainerReplica r : replicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcReplica = r;
+        break;
+      }
+    }
+    if (srcReplica == null) {
+      throw new RuntimeException("The source replica is not present");

Review Comment:
   Nit:
   
   ```suggestion
         throw new IllegalArgumentException("The source replica is not present");
   ```



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManagerImpl.java:
##########
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManagerImpl implements MoveManager,
+    ContainerReplicaPendingOpsSubscriber {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManagerImpl.class);
+
+  // TODO - Delay moves until some time after failover
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+  private final Map<ContainerID, CompletableFuture<MoveResult>>
+      pendingMoveFuture = new ConcurrentHashMap<>();
+  private final Map<ContainerID, MoveDataNodePair>
+      pendingMoveOps = new ConcurrentHashMap<>();
+  private volatile boolean running = false;
+
+  public MoveManagerImpl(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager)
+      throws IOException {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID, MoveDataNodePair> getPendingMove() {
+    return pendingMoveOps;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    if (pendingMoveOps.containsKey(cid)) {
+      CompletableFuture<MoveResult> future = pendingMoveFuture.remove(cid);
+      pendingMoveOps.remove(cid);
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoveOps.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoveOps.putIfAbsent(containerInfo.containerID(), mp);
+      pendingMoveFuture.putIfAbsent(containerInfo.containerID(), ret);
+    }
+  }
+
+  /**
+   * notify MoveManager that the current scm has become leader and ready.
+   */
+  @Override
+  public void onLeaderReady() {
+    //discard all stale records
+    pendingMoveOps.clear();
+    pendingMoveFuture.clear();
+    running = true;
+  }
+
+  /**
+   * notify MoveManager that the current scm leader steps down.
+   */
+  @Override
+  public void onNotLeader() {
+    running = false;
+  }
+
+  /**
+   * move a container replica from source datanode to
+   * target datanode. A move is a two part operation. First a replication
+   * command is scheduled to create a new copy of the replica. Later, when the
+   * replication completes a delete is scheduled to remove the original replica.
+   *
+   * @param cid Container to move
+   * @param src source datanode
+   * @param tgt target datanode
+   */
+  @Override
+  public CompletableFuture<MoveResult> move(
+      ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
+      throws ContainerNotFoundException, NodeNotFoundException,
+      TimeoutException, ContainerReplicaNotFoundException {
+    CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+
+    if (!running) {
+      ret.complete(MoveResult.FAIL_LEADER_NOT_READY);
+      return ret;
+    }
+
+    // Ensure src and tgt are IN_SERVICE and HEALTHY
+    for (DatanodeDetails dn : Arrays.asList(src, tgt)) {
+      NodeStatus currentNodeStatus = replicationManager.getNodeStatus(dn);
+      if (currentNodeStatus.getHealth() != HddsProtos.NodeState.HEALTHY) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+        return ret;
+      }
+      if (currentNodeStatus.getOperationalState()
+          != HddsProtos.NodeOperationalState.IN_SERVICE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+        return ret;
+      }
+    }
+
+    // Ensure the container exists on the src and is not present on the target
+    ContainerInfo containerInfo = containerManager.getContainer(cid);
+    final Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+
+    boolean srcExists = false;
+    for (ContainerReplica r : currentReplicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcExists = true;
+      }
+      if (r.getDatanodeDetails().equals(tgt)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+        return ret;
+      }
+    }
+    if (!srcExists) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+      return ret;
+    }
+
+    /*
+     * Ensure the container has no inflight actions.
+     * The reason why the given container should not be taking any inflight
+     * action is that: if the given container is being replicated or deleted,
+     * the num of its replica is not deterministic, so move operation issued
+     * by balancer may cause a nondeterministic result, so we should drop
+     * this option for this time.
+     */
+    List<ContainerReplicaOp> pendingOps =
+        replicationManager.getPendingReplicationOps(cid);
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+        return ret;
+      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+        return ret;
+      }
+    }
+
+    // Ensure the container is CLOSED
+    HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
+    if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
+      ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+      return ret;
+    }
+
+    // Create a set or replicas that indicates how the container will look
+    // after the move and ensure it is healthy - ie not under, over or mis
+    // replicated.
+    Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
+        containerInfo, src, tgt, currentReplicas);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, replicasAfterMove);
+    if (healthResult.getHealthState()
+        != ContainerHealthResult.HealthState.HEALTHY) {
+      ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+      return ret;
+    }
+    startMove(containerInfo, src, tgt, ret);
+    LOG.debug("Processed a move request for container {}, from {} to {}",
+        cid, src.getUuid(), tgt.getUuid());
+    return ret;
+  }
+
+  /**
+   * Notify Move Manager that a container op has been completed.
+   *
+   * @param containerReplicaOp ContainerReplicaOp which has completed
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        if (pendingMoveFuture.containsKey(containerID)) {
+          try {
+            handleSuccessfulAdd(containerID);
+          } catch (ContainerNotFoundException | NodeNotFoundException |
+                   ContainerReplicaNotFoundException e) {
+            LOG.warn("Can not handle successful Add for move", e);
+          }
+        } else {
+          LOG.warn("No matching entry found in pendingMoveFuture for " +
+              "containerID {}. Should not happen", containerID);
+          completeMove(containerID, null);
+        }
+      } else if (
+          opType.equals(PendingOpType.DELETE) && mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.COMPLETED);
+      }
+    }
+  }
+
+   /**
+   * Notify Move Manager that a container op has been Expired.
+   *
+   * @param containerReplicaOp ContainerReplicaOp
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpExpired(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        completeMove(containerID, MoveResult.REPLICATION_FAIL_TIME_OUT);
+      } else if (opType.equals(PendingOpType.DELETE) &&
+          mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.DELETION_FAIL_TIME_OUT);
+      }
+    }
+  }
+
+  private void handleSuccessfulAdd(final ContainerID cid)
+      throws ContainerNotFoundException,
+      ContainerReplicaNotFoundException, NodeNotFoundException {
+    MoveDataNodePair movePair = pendingMoveOps.get(cid);
+    if (movePair == null) {
+      return;
+    }
+    final DatanodeDetails src = movePair.getSrc();
+    Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+    if (currentReplicas.stream()
+        .noneMatch(r -> r.getDatanodeDetails().equals(src))) {
+      // if the target is present but source disappears somehow,
+      // we can consider move is successful.
+      completeMove(cid, MoveResult.COMPLETED);
+      return;
+    }
+
+    final NodeStatus nodeStatus = replicationManager.getNodeStatus(src);
+    if (nodeStatus.getOperationalState()
+        != HddsProtos.NodeOperationalState.IN_SERVICE) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
+      return;
+    }
+    if (!nodeStatus.isHealthy()) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+      return;
+    }
+
+    Set<ContainerReplica> futureReplicas = currentReplicas.stream()
+        .filter(r -> !r.getDatanodeDetails().equals(src))
+        .collect(Collectors.toSet());

Review Comment:
   Nit: we can avoid both streams by using `Collection#removeIf`:
   
   ```java
       Set<ContainerReplica> futureReplicas = new HashSet<>(currentReplicas);
       boolean found = futureReplicas.removeIf(r -> r.getDatanodeDetails().equals(src));
       if (!found) {
         // if the target is present but source disappears somehow,
         // we can consider move is successful.
         completeMove(cid, MoveResult.COMPLETED);
         return;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a diff in pull request #4274: HDDS-6572. EC: ReplicationManager - add move manager for container move

Posted by "sodonnel (via GitHub)" <gi...@apache.org>.
sodonnel commented on code in PR #4274:
URL: https://github.com/apache/ozone/pull/4274#discussion_r1108762231


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManagerImpl.java:
##########
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManagerImpl implements MoveManager,
+    ContainerReplicaPendingOpsSubscriber {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManagerImpl.class);
+
+  // TODO - Delay moves until some time after failover
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+  private final Map<ContainerID, CompletableFuture<MoveResult>>
+      pendingMoveFuture = new ConcurrentHashMap<>();
+  private final Map<ContainerID, MoveDataNodePair>
+      pendingMoveOps = new ConcurrentHashMap<>();
+  private volatile boolean running = false;
+
+  public MoveManagerImpl(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager)
+      throws IOException {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID, MoveDataNodePair> getPendingMove() {
+    return pendingMoveOps;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    if (pendingMoveOps.containsKey(cid)) {
+      CompletableFuture<MoveResult> future = pendingMoveFuture.remove(cid);
+      pendingMoveOps.remove(cid);
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoveOps.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoveOps.putIfAbsent(containerInfo.containerID(), mp);
+      pendingMoveFuture.putIfAbsent(containerInfo.containerID(), ret);
+    }
+  }
+
+  /**
+   * notify MoveManager that the current scm has become leader and ready.
+   */
+  @Override
+  public void onLeaderReady() {
+    //discard all stale records
+    pendingMoveOps.clear();
+    pendingMoveFuture.clear();
+    running = true;
+  }
+
+  /**
+   * notify MoveManager that the current scm leader steps down.
+   */
+  @Override
+  public void onNotLeader() {
+    running = false;
+  }
+
+  /**
+   * move a container replica from source datanode to
+   * target datanode. A move is a two part operation. First a replication
+   * command is scheduled to create a new copy of the replica. Later, when the
+   * replication completes a delete is scheduled to remove the original replica.
+   *
+   * @param cid Container to move
+   * @param src source datanode
+   * @param tgt target datanode
+   */
+  @Override
+  public CompletableFuture<MoveResult> move(
+      ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
+      throws ContainerNotFoundException, NodeNotFoundException,
+      TimeoutException, ContainerReplicaNotFoundException {
+    CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+
+    if (!running) {
+      ret.complete(MoveResult.FAIL_LEADER_NOT_READY);
+      return ret;
+    }
+
+    // Ensure src and tgt are IN_SERVICE and HEALTHY
+    for (DatanodeDetails dn : Arrays.asList(src, tgt)) {
+      NodeStatus currentNodeStatus = replicationManager.getNodeStatus(dn);
+      if (currentNodeStatus.getHealth() != HddsProtos.NodeState.HEALTHY) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+        return ret;
+      }
+      if (currentNodeStatus.getOperationalState()
+          != HddsProtos.NodeOperationalState.IN_SERVICE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+        return ret;
+      }
+    }
+
+    // Ensure the container exists on the src and is not present on the target
+    ContainerInfo containerInfo = containerManager.getContainer(cid);
+    final Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+
+    boolean srcExists = false;
+    for (ContainerReplica r : currentReplicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcExists = true;
+      }
+      if (r.getDatanodeDetails().equals(tgt)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+        return ret;
+      }
+    }
+    if (!srcExists) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+      return ret;
+    }
+
+    /*
+     * Ensure the container has no inflight actions.
+     * The reason why the given container should not be taking any inflight
+     * action is that: if the given container is being replicated or deleted,
+     * the num of its replica is not deterministic, so move operation issued
+     * by balancer may cause a nondeterministic result, so we should drop
+     * this option for this time.
+     */
+    List<ContainerReplicaOp> pendingOps =
+        replicationManager.getPendingReplicationOps(cid);
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+        return ret;
+      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+        return ret;
+      }
+    }
+
+    // Ensure the container is CLOSED
+    HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
+    if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
+      ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+      return ret;
+    }
+
+    // Create a set or replicas that indicates how the container will look
+    // after the move and ensure it is healthy - ie not under, over or mis
+    // replicated.
+    Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
+        containerInfo, src, tgt, currentReplicas);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, replicasAfterMove);
+    if (healthResult.getHealthState()
+        != ContainerHealthResult.HealthState.HEALTHY) {
+      ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+      return ret;
+    }
+    startMove(containerInfo, src, tgt, ret);
+    LOG.debug("Processed a move request for container {}, from {} to {}",
+        cid, src.getUuid(), tgt.getUuid());

Review Comment:
   Fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a diff in pull request #4274: HDDS-6572. EC: ReplicationManager - add move manager for container move

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai commented on code in PR #4274:
URL: https://github.com/apache/ozone/pull/4274#discussion_r1108806472


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java:
##########
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManager implements
+    ContainerReplicaPendingOpsSubscriber {
+
+  /**
+   * Various move return results.
+   */
+  public enum MoveResult {
+    // both replication and deletion are completed
+    COMPLETED,
+    // RM is not ratis leader
+    FAIL_LEADER_NOT_READY,
+    // replication fail because the container does not exist in src
+    REPLICATION_FAIL_NOT_EXIST_IN_SOURCE,
+    // replication fail because the container exists in target
+    REPLICATION_FAIL_EXIST_IN_TARGET,
+    // replication fail because the container is not cloesed
+    REPLICATION_FAIL_CONTAINER_NOT_CLOSED,
+    // replication fail because the container is in inflightDeletion
+    REPLICATION_FAIL_INFLIGHT_DELETION,
+    // replication fail because the container is in inflightReplication
+    REPLICATION_FAIL_INFLIGHT_REPLICATION,
+    // replication fail because of timeout
+    REPLICATION_FAIL_TIME_OUT,
+    // replication fail because of node is not in service
+    REPLICATION_FAIL_NODE_NOT_IN_SERVICE,
+    // replication fail because node is unhealthy
+    REPLICATION_FAIL_NODE_UNHEALTHY,
+    // replication succeed, but deletion fail because of timeout
+    DELETION_FAIL_TIME_OUT,
+    // deletion fail because of node is not in service
+    DELETION_FAIL_NODE_NOT_IN_SERVICE,
+    // replication succeed, but deletion fail because because
+    // node is unhealthy
+    DELETION_FAIL_NODE_UNHEALTHY,
+    // replication succeed, but if we delete the container from
+    // the source datanode , the policy(eg, replica num or
+    // rack location) will not be satisfied, so we should not delete
+    // the container
+    DELETE_FAIL_POLICY,
+    //  replicas + target - src does not satisfy placement policy
+    REPLICATION_NOT_HEALTHY,
+    //write DB error
+    FAIL_CAN_NOT_RECORD_TO_DB
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManager.class);
+
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+
+  private final Map<ContainerID,
+      Pair<CompletableFuture<MoveResult>, MoveDataNodePair>> pendingMoves =
+      new ConcurrentHashMap<>();
+
+  private volatile boolean running = false;
+
+  public MoveManager(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager) {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID,
+      Pair<CompletableFuture<MoveResult>, MoveDataNodePair>> getPendingMove() {
+    return pendingMoves;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    Pair<CompletableFuture<MoveResult>, MoveDataNodePair> move =
+        pendingMoves.remove(cid);
+    if (move != null) {
+      CompletableFuture<MoveResult> future = move.getLeft();
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoves.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoves.putIfAbsent(containerInfo.containerID(), Pair.of(ret, mp));

Review Comment:
   Sorry I missed this earlier.  We should use an atomic operation instead of containsKey+putIfAbsent:
   
   ```suggestion
       pendingMoves.computeIfAbsent(containerInfo.containerID(), key -> {
         MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
         sendReplicateCommand(containerInfo, tgt, src);
         return Pair.of(ret, mp));
       });
   ```
   
   (Hope I got the syntax right.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a diff in pull request #4274: HDDS-6572. EC: ReplicationManager - add move manager for container move

Posted by "sodonnel (via GitHub)" <gi...@apache.org>.
sodonnel commented on code in PR #4274:
URL: https://github.com/apache/ozone/pull/4274#discussion_r1108882897


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java:
##########
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManager implements
+    ContainerReplicaPendingOpsSubscriber {
+
+  /**
+   * Various move return results.
+   */
+  public enum MoveResult {
+    // both replication and deletion are completed
+    COMPLETED,
+    // RM is not ratis leader
+    FAIL_LEADER_NOT_READY,
+    // replication fail because the container does not exist in src
+    REPLICATION_FAIL_NOT_EXIST_IN_SOURCE,
+    // replication fail because the container exists in target
+    REPLICATION_FAIL_EXIST_IN_TARGET,
+    // replication fail because the container is not cloesed
+    REPLICATION_FAIL_CONTAINER_NOT_CLOSED,
+    // replication fail because the container is in inflightDeletion
+    REPLICATION_FAIL_INFLIGHT_DELETION,
+    // replication fail because the container is in inflightReplication
+    REPLICATION_FAIL_INFLIGHT_REPLICATION,
+    // replication fail because of timeout
+    REPLICATION_FAIL_TIME_OUT,
+    // replication fail because of node is not in service
+    REPLICATION_FAIL_NODE_NOT_IN_SERVICE,
+    // replication fail because node is unhealthy
+    REPLICATION_FAIL_NODE_UNHEALTHY,
+    // replication succeed, but deletion fail because of timeout
+    DELETION_FAIL_TIME_OUT,
+    // deletion fail because of node is not in service
+    DELETION_FAIL_NODE_NOT_IN_SERVICE,
+    // replication succeed, but deletion fail because because
+    // node is unhealthy
+    DELETION_FAIL_NODE_UNHEALTHY,
+    // replication succeed, but if we delete the container from
+    // the source datanode , the policy(eg, replica num or
+    // rack location) will not be satisfied, so we should not delete
+    // the container
+    DELETE_FAIL_POLICY,
+    //  replicas + target - src does not satisfy placement policy
+    REPLICATION_NOT_HEALTHY,
+    //write DB error
+    FAIL_CAN_NOT_RECORD_TO_DB
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManager.class);
+
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+
+  private final Map<ContainerID,
+      Pair<CompletableFuture<MoveResult>, MoveDataNodePair>> pendingMoves =
+      new ConcurrentHashMap<>();
+
+  private volatile boolean running = false;
+
+  public MoveManager(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager) {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID,
+      Pair<CompletableFuture<MoveResult>, MoveDataNodePair>> getPendingMove() {
+    return pendingMoves;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    Pair<CompletableFuture<MoveResult>, MoveDataNodePair> move =
+        pendingMoves.remove(cid);
+    if (move != null) {
+      CompletableFuture<MoveResult> future = move.getLeft();
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoves.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoves.putIfAbsent(containerInfo.containerID(), Pair.of(ret, mp));

Review Comment:
   yea makes sense. I had to change it a little to handle exceptions in the sendReplicateCommand, but the general ideal is the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a diff in pull request #4274: HDDS-6572. EC: ReplicationManager - add move manager for container move

Posted by "sodonnel (via GitHub)" <gi...@apache.org>.
sodonnel commented on code in PR #4274:
URL: https://github.com/apache/ozone/pull/4274#discussion_r1108764537


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManagerImpl.java:
##########
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManagerImpl implements MoveManager,
+    ContainerReplicaPendingOpsSubscriber {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManagerImpl.class);
+
+  // TODO - Delay moves until some time after failover
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+  private final Map<ContainerID, CompletableFuture<MoveResult>>
+      pendingMoveFuture = new ConcurrentHashMap<>();
+  private final Map<ContainerID, MoveDataNodePair>
+      pendingMoveOps = new ConcurrentHashMap<>();
+  private volatile boolean running = false;
+
+  public MoveManagerImpl(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager)
+      throws IOException {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID, MoveDataNodePair> getPendingMove() {
+    return pendingMoveOps;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    if (pendingMoveOps.containsKey(cid)) {
+      CompletableFuture<MoveResult> future = pendingMoveFuture.remove(cid);
+      pendingMoveOps.remove(cid);
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoveOps.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoveOps.putIfAbsent(containerInfo.containerID(), mp);
+      pendingMoveFuture.putIfAbsent(containerInfo.containerID(), ret);
+    }
+  }
+
+  /**
+   * notify MoveManager that the current scm has become leader and ready.
+   */
+  @Override
+  public void onLeaderReady() {
+    //discard all stale records
+    pendingMoveOps.clear();
+    pendingMoveFuture.clear();
+    running = true;
+  }
+
+  /**
+   * notify MoveManager that the current scm leader steps down.
+   */
+  @Override
+  public void onNotLeader() {
+    running = false;
+  }
+
+  /**
+   * move a container replica from source datanode to
+   * target datanode. A move is a two part operation. First a replication
+   * command is scheduled to create a new copy of the replica. Later, when the
+   * replication completes a delete is scheduled to remove the original replica.
+   *
+   * @param cid Container to move
+   * @param src source datanode
+   * @param tgt target datanode
+   */
+  @Override
+  public CompletableFuture<MoveResult> move(
+      ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
+      throws ContainerNotFoundException, NodeNotFoundException,
+      TimeoutException, ContainerReplicaNotFoundException {
+    CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+
+    if (!running) {
+      ret.complete(MoveResult.FAIL_LEADER_NOT_READY);
+      return ret;
+    }
+
+    // Ensure src and tgt are IN_SERVICE and HEALTHY
+    for (DatanodeDetails dn : Arrays.asList(src, tgt)) {
+      NodeStatus currentNodeStatus = replicationManager.getNodeStatus(dn);
+      if (currentNodeStatus.getHealth() != HddsProtos.NodeState.HEALTHY) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+        return ret;
+      }
+      if (currentNodeStatus.getOperationalState()
+          != HddsProtos.NodeOperationalState.IN_SERVICE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+        return ret;
+      }
+    }
+
+    // Ensure the container exists on the src and is not present on the target
+    ContainerInfo containerInfo = containerManager.getContainer(cid);
+    final Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+
+    boolean srcExists = false;
+    for (ContainerReplica r : currentReplicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcExists = true;
+      }
+      if (r.getDatanodeDetails().equals(tgt)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+        return ret;
+      }
+    }
+    if (!srcExists) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+      return ret;
+    }
+
+    /*
+     * Ensure the container has no inflight actions.
+     * The reason why the given container should not be taking any inflight
+     * action is that: if the given container is being replicated or deleted,
+     * the num of its replica is not deterministic, so move operation issued
+     * by balancer may cause a nondeterministic result, so we should drop
+     * this option for this time.
+     */
+    List<ContainerReplicaOp> pendingOps =
+        replicationManager.getPendingReplicationOps(cid);
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+        return ret;
+      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+        return ret;
+      }
+    }
+
+    // Ensure the container is CLOSED
+    HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
+    if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
+      ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+      return ret;
+    }
+
+    // Create a set or replicas that indicates how the container will look
+    // after the move and ensure it is healthy - ie not under, over or mis
+    // replicated.
+    Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
+        containerInfo, src, tgt, currentReplicas);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, replicasAfterMove);
+    if (healthResult.getHealthState()
+        != ContainerHealthResult.HealthState.HEALTHY) {
+      ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+      return ret;
+    }
+    startMove(containerInfo, src, tgt, ret);
+    LOG.debug("Processed a move request for container {}, from {} to {}",
+        cid, src.getUuid(), tgt.getUuid());
+    return ret;
+  }
+
+  /**
+   * Notify Move Manager that a container op has been completed.
+   *
+   * @param containerReplicaOp ContainerReplicaOp which has completed
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        if (pendingMoveFuture.containsKey(containerID)) {
+          try {
+            handleSuccessfulAdd(containerID);
+          } catch (ContainerNotFoundException | NodeNotFoundException |
+                   ContainerReplicaNotFoundException e) {
+            LOG.warn("Can not handle successful Add for move", e);
+          }
+        } else {
+          LOG.warn("No matching entry found in pendingMoveFuture for " +
+              "containerID {}. Should not happen", containerID);
+          completeMove(containerID, null);
+        }
+      } else if (
+          opType.equals(PendingOpType.DELETE) && mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.COMPLETED);
+      }
+    }
+  }
+
+   /**
+   * Notify Move Manager that a container op has been Expired.
+   *
+   * @param containerReplicaOp ContainerReplicaOp
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpExpired(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        completeMove(containerID, MoveResult.REPLICATION_FAIL_TIME_OUT);
+      } else if (opType.equals(PendingOpType.DELETE) &&
+          mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.DELETION_FAIL_TIME_OUT);
+      }
+    }
+  }
+
+  private void handleSuccessfulAdd(final ContainerID cid)
+      throws ContainerNotFoundException,
+      ContainerReplicaNotFoundException, NodeNotFoundException {
+    MoveDataNodePair movePair = pendingMoveOps.get(cid);
+    if (movePair == null) {
+      return;
+    }
+    final DatanodeDetails src = movePair.getSrc();
+    Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+    if (currentReplicas.stream()
+        .noneMatch(r -> r.getDatanodeDetails().equals(src))) {
+      // if the target is present but source disappears somehow,
+      // we can consider move is successful.
+      completeMove(cid, MoveResult.COMPLETED);
+      return;
+    }
+
+    final NodeStatus nodeStatus = replicationManager.getNodeStatus(src);
+    if (nodeStatus.getOperationalState()
+        != HddsProtos.NodeOperationalState.IN_SERVICE) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
+      return;
+    }
+    if (!nodeStatus.isHealthy()) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+      return;
+    }
+
+    Set<ContainerReplica> futureReplicas = currentReplicas.stream()
+        .filter(r -> !r.getDatanodeDetails().equals(src))
+        .collect(Collectors.toSet());
+
+    final ContainerInfo containerInfo = containerManager.getContainer(cid);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, futureReplicas);
+
+    if (healthResult.getHealthState() ==
+        ContainerHealthResult.HealthState.HEALTHY) {
+      sendDeleteCommand(containerInfo, src);
+    } else {
+      LOG.info("Cannot remove source replica as the container health would " +
+          " be {}", healthResult.getHealthState());
+      completeMove(cid, MoveResult.DELETE_FAIL_POLICY);
+    }
+  }
+
+  private Set<ContainerReplica> createReplicaSetAfterMove(
+      ContainerInfo containerInfo, DatanodeDetails src,
+      DatanodeDetails tgt, Set<ContainerReplica> existing) {
+    Set<ContainerReplica> replicas = new HashSet<>(existing);
+    ContainerReplica srcReplica = null;
+    for (ContainerReplica r : replicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcReplica = r;
+        break;
+      }
+    }
+    if (srcReplica == null) {
+      throw new RuntimeException("The source replica is not present");
+    }
+    replicas.remove(srcReplica);
+    replicas.add(ContainerReplica.newBuilder()
+        .setDatanodeDetails(tgt)
+        .setContainerID(containerInfo.containerID())
+        .setContainerState(StorageContainerDatanodeProtocolProtos
+            .ContainerReplicaProto.State.CLOSED)
+        .setOriginNodeId(srcReplica.getOriginDatanodeId())
+        .setSequenceId(srcReplica.getSequenceId())
+        .setBytesUsed(srcReplica.getBytesUsed())
+        .setKeyCount(srcReplica.getKeyCount())
+        .setReplicaIndex(srcReplica.getReplicaIndex())

Review Comment:
   Good suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a diff in pull request #4274: HDDS-6572. EC: ReplicationManager - add move manager for container move

Posted by "sodonnel (via GitHub)" <gi...@apache.org>.
sodonnel commented on code in PR #4274:
URL: https://github.com/apache/ozone/pull/4274#discussion_r1108777225


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManagerImpl.java:
##########
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.balancer;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOpsSubscriber;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * A class which schedules, tracks and completes moves scheduled by the
+ * balancer.
+ */
+public final class MoveManagerImpl implements MoveManager,
+    ContainerReplicaPendingOpsSubscriber {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MoveManagerImpl.class);
+
+  // TODO - Delay moves until some time after failover
+  // TODO - Should pending ops notify under lock to allow MM to schedule a
+  //        delete after the move, but before anything else can, eg RM?
+
+  // TODO - these need to be config defined somewhere, probably in the balancer
+  private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
+  private static final double MOVE_DEADLINE_FACTOR = 0.95;
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+  private final Clock clock;
+  private final Map<ContainerID, CompletableFuture<MoveResult>>
+      pendingMoveFuture = new ConcurrentHashMap<>();
+  private final Map<ContainerID, MoveDataNodePair>
+      pendingMoveOps = new ConcurrentHashMap<>();
+  private volatile boolean running = false;
+
+  public MoveManagerImpl(final ReplicationManager replicationManager,
+      final Clock clock, final ContainerManager containerManager)
+      throws IOException {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+    this.clock = clock;
+  }
+
+  /**
+   * get all the pending move operations.
+   */
+  public Map<ContainerID, MoveDataNodePair> getPendingMove() {
+    return pendingMoveOps;
+  }
+
+  /**
+   * completeMove a move action for a given container.
+   *
+   * @param cid Container id to which the move option is finished
+   */
+  private void completeMove(final ContainerID cid, final MoveResult mr) {
+    if (pendingMoveOps.containsKey(cid)) {
+      CompletableFuture<MoveResult> future = pendingMoveFuture.remove(cid);
+      pendingMoveOps.remove(cid);
+      if (future != null && mr != null) {
+        // when we know the future is null, and we want to complete
+        // the move , then we set mr to null.
+        // for example , after becoming a new leader, we might need
+        // to complete some moves and we know these futures does not
+        // exist.
+        future.complete(mr);
+      }
+    }
+  }
+
+  /**
+   * start a move for a given container.
+   *
+   * @param containerInfo The container for which to move a replica
+   * @param src source datanode to copy the replica from
+   * @param tgt target datanode to copy the replica to
+   */
+  private void startMove(
+      final ContainerInfo containerInfo, final DatanodeDetails src,
+      final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret)
+      throws ContainerReplicaNotFoundException,
+      ContainerNotFoundException {
+    if (!pendingMoveOps.containsKey(containerInfo.containerID())) {
+      MoveDataNodePair mp = new MoveDataNodePair(src, tgt);
+      sendReplicateCommand(containerInfo, tgt, src);
+      pendingMoveOps.putIfAbsent(containerInfo.containerID(), mp);
+      pendingMoveFuture.putIfAbsent(containerInfo.containerID(), ret);
+    }
+  }
+
+  /**
+   * notify MoveManager that the current scm has become leader and ready.
+   */
+  @Override
+  public void onLeaderReady() {
+    //discard all stale records
+    pendingMoveOps.clear();
+    pendingMoveFuture.clear();
+    running = true;
+  }
+
+  /**
+   * notify MoveManager that the current scm leader steps down.
+   */
+  @Override
+  public void onNotLeader() {
+    running = false;
+  }
+
+  /**
+   * move a container replica from source datanode to
+   * target datanode. A move is a two part operation. First a replication
+   * command is scheduled to create a new copy of the replica. Later, when the
+   * replication completes a delete is scheduled to remove the original replica.
+   *
+   * @param cid Container to move
+   * @param src source datanode
+   * @param tgt target datanode
+   */
+  @Override
+  public CompletableFuture<MoveResult> move(
+      ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
+      throws ContainerNotFoundException, NodeNotFoundException,
+      TimeoutException, ContainerReplicaNotFoundException {
+    CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+
+    if (!running) {
+      ret.complete(MoveResult.FAIL_LEADER_NOT_READY);
+      return ret;
+    }
+
+    // Ensure src and tgt are IN_SERVICE and HEALTHY
+    for (DatanodeDetails dn : Arrays.asList(src, tgt)) {
+      NodeStatus currentNodeStatus = replicationManager.getNodeStatus(dn);
+      if (currentNodeStatus.getHealth() != HddsProtos.NodeState.HEALTHY) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+        return ret;
+      }
+      if (currentNodeStatus.getOperationalState()
+          != HddsProtos.NodeOperationalState.IN_SERVICE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+        return ret;
+      }
+    }
+
+    // Ensure the container exists on the src and is not present on the target
+    ContainerInfo containerInfo = containerManager.getContainer(cid);
+    final Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+
+    boolean srcExists = false;
+    for (ContainerReplica r : currentReplicas) {
+      if (r.getDatanodeDetails().equals(src)) {
+        srcExists = true;
+      }
+      if (r.getDatanodeDetails().equals(tgt)) {
+        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+        return ret;
+      }
+    }
+    if (!srcExists) {
+      ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+      return ret;
+    }
+
+    /*
+     * Ensure the container has no inflight actions.
+     * The reason why the given container should not be taking any inflight
+     * action is that: if the given container is being replicated or deleted,
+     * the num of its replica is not deterministic, so move operation issued
+     * by balancer may cause a nondeterministic result, so we should drop
+     * this option for this time.
+     */
+    List<ContainerReplicaOp> pendingOps =
+        replicationManager.getPendingReplicationOps(cid);
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+        return ret;
+      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+        return ret;
+      }
+    }
+
+    // Ensure the container is CLOSED
+    HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
+    if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
+      ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+      return ret;
+    }
+
+    // Create a set or replicas that indicates how the container will look
+    // after the move and ensure it is healthy - ie not under, over or mis
+    // replicated.
+    Set<ContainerReplica> replicasAfterMove = createReplicaSetAfterMove(
+        containerInfo, src, tgt, currentReplicas);
+    ContainerHealthResult healthResult = replicationManager
+        .getContainerReplicationHealth(containerInfo, replicasAfterMove);
+    if (healthResult.getHealthState()
+        != ContainerHealthResult.HealthState.HEALTHY) {
+      ret.complete(MoveResult.REPLICATION_NOT_HEALTHY);
+      return ret;
+    }
+    startMove(containerInfo, src, tgt, ret);
+    LOG.debug("Processed a move request for container {}, from {} to {}",
+        cid, src.getUuid(), tgt.getUuid());
+    return ret;
+  }
+
+  /**
+   * Notify Move Manager that a container op has been completed.
+   *
+   * @param containerReplicaOp ContainerReplicaOp which has completed
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        if (pendingMoveFuture.containsKey(containerID)) {
+          try {
+            handleSuccessfulAdd(containerID);
+          } catch (ContainerNotFoundException | NodeNotFoundException |
+                   ContainerReplicaNotFoundException e) {
+            LOG.warn("Can not handle successful Add for move", e);
+          }
+        } else {
+          LOG.warn("No matching entry found in pendingMoveFuture for " +
+              "containerID {}. Should not happen", containerID);
+          completeMove(containerID, null);
+        }
+      } else if (
+          opType.equals(PendingOpType.DELETE) && mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.COMPLETED);
+      }
+    }
+  }
+
+   /**
+   * Notify Move Manager that a container op has been Expired.
+   *
+   * @param containerReplicaOp ContainerReplicaOp
+   * @param containerID ContainerID for which to complete
+   */
+  private void notifyContainerOpExpired(ContainerReplicaOp containerReplicaOp,
+      ContainerID containerID) {
+    if (!running) {
+      return;
+    }
+
+    MoveDataNodePair mdnp = pendingMoveOps.get(containerID);
+    if (mdnp != null) {
+      PendingOpType opType = containerReplicaOp.getOpType();
+      DatanodeDetails dn = containerReplicaOp.getTarget();
+      if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
+        completeMove(containerID, MoveResult.REPLICATION_FAIL_TIME_OUT);
+      } else if (opType.equals(PendingOpType.DELETE) &&
+          mdnp.getSrc().equals(dn)) {
+        completeMove(containerID, MoveResult.DELETION_FAIL_TIME_OUT);
+      }
+    }
+  }
+
+  private void handleSuccessfulAdd(final ContainerID cid)
+      throws ContainerNotFoundException,
+      ContainerReplicaNotFoundException, NodeNotFoundException {
+    MoveDataNodePair movePair = pendingMoveOps.get(cid);
+    if (movePair == null) {
+      return;
+    }
+    final DatanodeDetails src = movePair.getSrc();
+    Set<ContainerReplica> currentReplicas = containerManager
+        .getContainerReplicas(cid);
+    if (currentReplicas.stream()
+        .noneMatch(r -> r.getDatanodeDetails().equals(src))) {
+      // if the target is present but source disappears somehow,
+      // we can consider move is successful.
+      completeMove(cid, MoveResult.COMPLETED);
+      return;
+    }
+
+    final NodeStatus nodeStatus = replicationManager.getNodeStatus(src);
+    if (nodeStatus.getOperationalState()
+        != HddsProtos.NodeOperationalState.IN_SERVICE) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
+      return;
+    }
+    if (!nodeStatus.isHealthy()) {
+      completeMove(cid, MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+      return;
+    }
+
+    Set<ContainerReplica> futureReplicas = currentReplicas.stream()
+        .filter(r -> !r.getDatanodeDetails().equals(src))
+        .collect(Collectors.toSet());

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org