You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2019/10/13 06:50:07 UTC

[hadoop-ozone] 01/01: Create admin commands and protobuf messages to allow decommission / recommission and maintenance commands to be sent from the CLI and update the node status in a skeleton decommission manager

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-2196
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit edda9a6dead848cb16d4691300243d8748b33051
Author: S O'Donnell <so...@cloudera.com>
AuthorDate: Mon Sep 23 18:23:47 2019 +0100

    Create admin commands and protobuf messages to allow decommission / recommission and maintenance commands to be sent from the CLI and update the node status in a skeleton decommission manager
---
 .../hdds/scm/client/ContainerOperationClient.java  |  16 ++
 .../apache/hadoop/hdds/scm/client/ScmClient.java   |  29 +++
 .../protocol/StorageContainerLocationProtocol.java |   7 +
 ...inerLocationProtocolClientSideTranslatorPB.java |  58 +++++
 .../proto/StorageContainerLocationProtocol.proto   |  45 +++-
 .../hdds/scm/node/InvalidHostStringException.java  |  34 +++
 .../hdds/scm/node/NodeDecommissionManager.java     | 286 +++++++++++++++++++++
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  14 +-
 .../hadoop/hdds/scm/node/NodeStateManager.java     |  15 ++
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  19 +-
 ...inerLocationProtocolServerSideTranslatorPB.java |  54 ++++
 .../hdds/scm/server/SCMClientProtocolServer.java   |  37 +++
 .../hdds/scm/server/StorageContainerManager.java   |  15 ++
 .../hadoop/hdds/scm/container/MockNodeManager.java |  11 +-
 .../hdds/scm/node/TestNodeDecommissionManager.java | 253 ++++++++++++++++++
 .../hadoop/hdds/scm/node/TestNodeStateManager.java |  17 ++
 .../testutils/ReplicationNodeManagerMock.java      |  34 ++-
 .../org/apache/hadoop/hdds/scm/cli/SCMCLI.java     |   4 +-
 .../hdds/scm/cli/node/DatanodeAdminCommands.java   |  55 ++++
 .../node/DatanodeAdminDecommissionSubCommand.java  |  58 +++++
 .../node/DatanodeAdminMaintenanceSubCommand.java   |  63 +++++
 .../node/DatanodeAdminRecommissionSubCommand.java  |  58 +++++
 .../hadoop/hdds/scm/cli/node/package-info.java     |  23 ++
 .../scm/node/TestDecommissionAndMaintenance.java   | 137 ++++++++++
 24 files changed, 1322 insertions(+), 20 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index c97354f..15fc09b 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -210,6 +210,22 @@ public class ContainerOperationClient implements ScmClient {
         poolName);
   }
 
+  @Override
+  public void decommissionNodes(List<String> hosts) throws IOException {
+    storageContainerLocationClient.decommissionNodes(hosts);
+  }
+
+  @Override
+  public void recommissionNodes(List<String> hosts) throws IOException {
+    storageContainerLocationClient.recommissionNodes(hosts);
+  }
+
+  @Override
+  public void startMaintenanceNodes(List<String> hosts, int endHours)
+      throws IOException {
+    storageContainerLocationClient.startMaintenanceNodes(hosts, endHours);
+  }
+
   /**
    * Creates a specified replication pipeline.
    */
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 226ceda..8238e84 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -162,6 +162,35 @@ public interface ScmClient extends Closeable {
       HddsProtos.QueryScope queryScope, String poolName) throws IOException;
 
   /**
+   * Allows a list of hosts to be decommissioned. The hosts are identified
+   * by their hostname and optionally port in the format foo.com:port.
+   * @param hosts A list of hostnames, optionally with port
+   * @throws IOException
+   */
+  void decommissionNodes(List<String> hosts) throws IOException;
+
+  /**
+   * Allows a list of hosts in maintenance or decommission states to be placed
+   * back in service. The hosts are identified by their hostname and optionally
+   * port in the format foo.com:port.
+   * @param hosts A list of hostnames, optionally with port
+   * @throws IOException
+   */
+  void recommissionNodes(List<String> hosts) throws IOException;
+
+  /**
+   * Place the list of datanodes into maintenance mode. If a non-null endDtm
+   * is passed, the hosts will automatically exit maintenance mode after the
+   * given time has passed. The hosts are identified by their hostname and
+   * optionally port in the format foo.com:port.
+   * @param hosts A list of hostnames, optionally with port
+   * @param endHours The number of hours from now which maintenance will end
+   * @throws IOException
+   */
+  void startMaintenanceNodes(List<String> hosts, int endHours)
+      throws IOException;
+
+  /**
    * Creates a specified replication pipeline.
    * @param type - Type
    * @param factor - Replication factor
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 88db820..91cd40d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -112,6 +112,13 @@ public interface StorageContainerLocationProtocol extends Closeable {
   List<HddsProtos.Node> queryNode(HddsProtos.NodeState state,
       HddsProtos.QueryScope queryScope, String poolName) throws IOException;
 
+  void decommissionNodes(List<String> nodes) throws IOException;
+
+  void recommissionNodes(List<String> nodes) throws IOException;
+
+  void startMaintenanceNodes(List<String> nodes, int endInHours)
+      throws IOException;
+
   /**
    * Notify from client when begin or finish creating objects like pipeline
    * or containers on datanodes.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 01db597..99941d1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -53,6 +53,9 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartMaintenanceNodesRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.RecommissionNodesRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -257,6 +260,61 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
   }
 
   /**
+   * Attempts to decommission the list of nodes.
+   * @param nodes The list of hostnames or hostname:ports to decommission
+   * @throws IOException
+   */
+  @Override
+  public void decommissionNodes(List<String> nodes) throws IOException {
+    Preconditions.checkNotNull(nodes);
+    DecommissionNodesRequestProto request =
+        DecommissionNodesRequestProto.newBuilder()
+        .addAllHosts(nodes)
+        .build();
+    submitRequest(Type.DecommissionNodes,
+        builder -> builder.setDecommissionNodesRequest(request));
+  }
+
+  /**
+   * Attempts to recommission the list of nodes.
+   * @param nodes The list of hostnames or hostname:ports to recommission
+   * @throws IOException
+   */
+  @Override
+  public void recommissionNodes(List<String> nodes) throws IOException {
+    Preconditions.checkNotNull(nodes);
+    RecommissionNodesRequestProto request =
+        RecommissionNodesRequestProto.newBuilder()
+            .addAllHosts(nodes)
+            .build();
+    submitRequest(Type.RecommissionNodes,
+        builder -> builder.setRecommissionNodesRequest(request));
+  }
+
+  /**
+   * Attempts to put the list of nodes into maintenance mode.
+   *
+   * @param nodes The list of hostnames or hostname:ports to put into
+   *              maintenance
+   * @param endInHours A number of hours from now where the nodes will be taken
+   *                   out of maintenance automatically. Passing zero will
+   *                   allow the nodes to stay in maintenance indefinitely
+   * @throws IOException
+   */
+  @Override
+  public void startMaintenanceNodes(List<String> nodes, int endInHours)
+      throws IOException {
+    Preconditions.checkNotNull(nodes);
+    StartMaintenanceNodesRequestProto request =
+        StartMaintenanceNodesRequestProto.newBuilder()
+            .addAllHosts(nodes)
+            .setEndInHours(endInHours)
+            .build();
+    submitRequest(Type.StartMaintenanceNodes,
+        builder -> builder.setStartMaintenanceNodesRequest(request));
+  }
+
+  /**
    * Notify from client that creates object on datanodes.
    *
    * @param type  object type
diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index 8ea72b6..00e58c0 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -59,7 +59,9 @@ message ScmContainerLocationRequest {
   optional StartReplicationManagerRequestProto startReplicationManagerRequest = 21;
   optional StopReplicationManagerRequestProto stopReplicationManagerRequest = 22;
   optional ReplicationManagerStatusRequestProto seplicationManagerStatusRequest = 23;
-
+  optional DecommissionNodesRequestProto decommissionNodesRequest = 24;
+  optional RecommissionNodesRequestProto recommissionNodesRequest = 25;
+  optional StartMaintenanceNodesRequestProto startMaintenanceNodesRequest = 26;
 }
 
 message ScmContainerLocationResponse {
@@ -91,6 +93,9 @@ message ScmContainerLocationResponse {
   optional StartReplicationManagerResponseProto startReplicationManagerResponse = 21;
   optional StopReplicationManagerResponseProto stopReplicationManagerResponse = 22;
   optional ReplicationManagerStatusResponseProto replicationManagerStatusResponse = 23;
+  optional DecommissionNodesResponseProto decommissionNodesResponse = 24;
+  optional RecommissionNodesResponseProto recommissionNodesResponse = 25;
+  optional StartMaintenanceNodesResponseProto startMaintenanceNodesResponse = 26;
   enum Status {
     OK = 1;
     CONTAINER_ALREADY_EXISTS = 2;
@@ -118,6 +123,9 @@ enum Type {
   StartReplicationManager = 16;
   StopReplicationManager = 17;
   GetReplicationManagerStatus = 18;
+  DecommissionNodes = 19;
+  RecommissionNodes = 20;
+  StartMaintenanceNodes = 21;
 }
 
 /**
@@ -225,6 +233,40 @@ message NodeQueryResponseProto {
   repeated Node datanodes = 1;
 }
 
+/*
+  Decommission a list of hosts
+*/
+message DecommissionNodesRequestProto {
+  repeated string hosts = 1;
+}
+
+message DecommissionNodesResponseProto {
+  // empty response
+}
+
+/*
+  Recommission a list of hosts in maintenance or decommission states
+*/
+message RecommissionNodesRequestProto {
+  repeated string hosts = 1;
+}
+
+message RecommissionNodesResponseProto {
+  // empty response
+}
+
+/*
+  Place a list of hosts into maintenance mode
+*/
+message StartMaintenanceNodesRequestProto {
+  repeated string hosts = 1;
+  optional int64 endInHours = 2;
+}
+
+message StartMaintenanceNodesResponseProto {
+  // empty response
+}
+
 /**
   Request to create a replication pipeline.
  */
@@ -326,5 +368,4 @@ message ReplicationManagerStatusResponseProto {
  */
 service StorageContainerLocationProtocolService {
   rpc submitRequest (ScmContainerLocationRequest) returns (ScmContainerLocationResponse);
-
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/InvalidHostStringException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/InvalidHostStringException.java
new file mode 100644
index 0000000..c4046c1
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/InvalidHostStringException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown by the NodeDecommissionManager when it encounters
+ * host strings it does not expect or understand.
+ */
+
+public class InvalidHostStringException extends IOException {
+  public InvalidHostStringException(String msg) {
+    super(msg);
+  }
+
+  public InvalidHostStringException(String msg, Exception e) {
+    super(msg, e);
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
new file mode 100644
index 0000000..60813dd
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Class used to manage datanodes scheduled for maintenance or decommission.
+ */
+public class NodeDecommissionManager {
+
+  private NodeManager nodeManager;
+  private PipelineManager pipeLineManager;
+  private ContainerManager containerManager;
+  private OzoneConfiguration conf;
+  private boolean useHostnames;
+
+  private List<DatanodeDetails> pendingNodes = new LinkedList<>();
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DatanodeAdminManager.class);
+
+
+  static class HostDefinition {
+    private String rawHostname;
+    private String hostname;
+    private int port;
+
+    HostDefinition(String hostname) throws InvalidHostStringException {
+      this.rawHostname = hostname;
+      parseHostname();
+    }
+
+    public String getRawHostname() {
+      return rawHostname;
+    }
+
+    public String getHostname() {
+      return hostname;
+    }
+
+    public int getPort() {
+      return port;
+    }
+
+    private void parseHostname() throws InvalidHostStringException{
+      try {
+        // A URI *must* have a scheme, so just create a fake one
+        URI uri = new URI("my://"+rawHostname.trim());
+        this.hostname = uri.getHost();
+        this.port = uri.getPort();
+
+        if (this.hostname == null) {
+          throw new InvalidHostStringException("The string "+rawHostname+
+              " does not contain a value hostname or hostname:port definition");
+        }
+      } catch (URISyntaxException e) {
+        throw new InvalidHostStringException(
+            "Unable to parse the hoststring "+rawHostname, e);
+      }
+    }
+  }
+
+  private List<DatanodeDetails> mapHostnamesToDatanodes(List<String> hosts)
+      throws InvalidHostStringException {
+    List<DatanodeDetails> results = new LinkedList<>();
+    for (String hostString : hosts) {
+      HostDefinition host = new HostDefinition(hostString);
+      InetAddress addr;
+      try {
+        addr = InetAddress.getByName(host.getHostname());
+      } catch (UnknownHostException e) {
+        throw new InvalidHostStringException("Unable to resolve the host "
+            +host.getRawHostname(), e);
+      }
+      String dnsName;
+      if (useHostnames) {
+        dnsName = addr.getHostName();
+      } else {
+        dnsName = addr.getHostAddress();
+      }
+      List<DatanodeDetails> found = nodeManager.getNodesByAddress(dnsName);
+      if (found.size() == 0) {
+        throw new InvalidHostStringException("The string " +
+            host.getRawHostname()+" resolved to "+dnsName +
+            " is not found in SCM");
+      } else if (found.size() == 1) {
+        if (host.getPort() != -1 &&
+            !validateDNPortMatch(host.getPort(), found.get(0))) {
+          throw new InvalidHostStringException("The string "+
+              host.getRawHostname()+" matched a single datanode, but the "+
+              "given port is not used by that Datanode");
+        }
+        results.add(found.get(0));
+      } else if (found.size() > 1) {
+        DatanodeDetails match = null;
+        for(DatanodeDetails dn : found) {
+          if (validateDNPortMatch(host.getPort(), dn)) {
+            match = dn;
+            break;
+          }
+        }
+        if (match == null) {
+          throw new InvalidHostStringException("The string " +
+              host.getRawHostname()+ "matched multiple Datanodes, but no "+
+              "datanode port matched the given port");
+        }
+        results.add(match);
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Check if the passed port is used by the given DatanodeDetails object. If
+   * it is, return true, otherwise return false.
+   * @param port Port number to check if it is used by the datanode
+   * @param dn Datanode to check if it is using the given port
+   * @return True if port is used by the datanode. False otherwise.
+   */
+  private boolean validateDNPortMatch(int port, DatanodeDetails dn) {
+    for (DatanodeDetails.Port p : dn.getPorts()) {
+      if (p.getValue() == port) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public NodeDecommissionManager(OzoneConfiguration conf,
+      NodeManager nodeManager, PipelineManager pipelineManager,
+      ContainerManager containerManager) {
+    this.conf = conf;
+    this.nodeManager = nodeManager;
+    this.pipeLineManager = pipelineManager;
+    this.containerManager = containerManager;
+
+    useHostnames = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+  }
+
+  public synchronized void decommissionNodes(List nodes)
+      throws InvalidHostStringException {
+    List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
+    for (DatanodeDetails dn : dns) {
+      try {
+        startDecommission(dn);
+      } catch (NodeNotFoundException e) {
+        // We already validated the host strings and retrieved the DnDetails
+        // object from the node manager. Therefore we should never get a
+        // NodeNotFoundException here expect if the node is remove in the
+        // very short window between validation and starting decom. Therefore
+        // log a warning and ignore the exception
+        LOG.warn("The host {} was not found in SCM. Ignoring the request to "+
+            "decommission it", dn.getHostName());
+      }
+    }
+  }
+
+  public synchronized void startDecommission(DatanodeDetails dn)
+      throws NodeNotFoundException {
+    NodeStatus nodeStatus = getNodeStatus(dn);
+    NodeOperationalState opState = nodeStatus.getOperationalState();
+    LOG.info("In decommission the op state is {}", opState);
+    if (opState != NodeOperationalState.DECOMMISSIONING
+        && opState != NodeOperationalState.DECOMMISSIONED) {
+      LOG.info("Starting Decommission for node {}", dn);
+      nodeManager.setNodeOperationalState(
+          dn, NodeOperationalState.DECOMMISSIONING);
+      pendingNodes.add(dn);
+    } else {
+      LOG.info("Start Decommission called on node {} in state {}. Nothing to "+
+          "do.", dn, opState);
+    }
+  }
+
+  public synchronized void recommissionNodes(List nodes)
+      throws InvalidHostStringException {
+    List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
+    for (DatanodeDetails dn : dns) {
+      try {
+        recommission(dn);
+      } catch (NodeNotFoundException e) {
+        // We already validated the host strings and retrieved the DnDetails
+        // object from the node manager. Therefore we should never get a
+        // NodeNotFoundException here expect if the node is remove in the
+        // very short window between validation and starting decom. Therefore
+        // log a warning and ignore the exception
+        LOG.warn("The host {} was not found in SCM. Ignoring the request to "+
+            "recommission it", dn.getHostName());
+      }
+    }
+  }
+
+  public synchronized void recommission(DatanodeDetails dn)
+      throws NodeNotFoundException{
+    NodeStatus nodeStatus = getNodeStatus(dn);
+    NodeOperationalState opState = nodeStatus.getOperationalState();
+    if (opState != NodeOperationalState.IN_SERVICE) {
+      nodeManager.setNodeOperationalState(
+          dn, NodeOperationalState.IN_SERVICE);
+      pendingNodes.remove(dn);
+      LOG.info("Recommissioned node {}", dn);
+    } else {
+      LOG.info("Recommission called on node {} with state {}. "+
+          "Nothing to do.", dn, opState);
+    }
+  }
+
+  public synchronized void startMaintenanceNodes(List nodes, int endInHours)
+      throws InvalidHostStringException {
+    List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
+    for (DatanodeDetails dn : dns) {
+      try {
+        startMaintenance(dn, endInHours);
+      } catch (NodeNotFoundException e) {
+        // We already validated the host strings and retrieved the DnDetails
+        // object from the node manager. Therefore we should never get a
+        // NodeNotFoundException here expect if the node is remove in the
+        // very short window between validation and starting decom. Therefore
+        // log a warning and ignore the exception
+        LOG.warn("The host {} was not found in SCM. Ignoring the request to "+
+            "start maintenance on it", dn.getHostName());
+      }
+    }
+  }
+
+  // TODO - If startMaintenance is called on a host already in maintenance,
+  //        then we should update the end time?
+  public synchronized void startMaintenance(DatanodeDetails dn, int endInHours)
+      throws NodeNotFoundException {
+    NodeStatus nodeStatus = getNodeStatus(dn);
+    NodeOperationalState opState = nodeStatus.getOperationalState();
+    if (opState != NodeOperationalState.ENTERING_MAINTENANCE &&
+        opState != NodeOperationalState.IN_MAINTENANCE) {
+      nodeManager.setNodeOperationalState(
+          dn, NodeOperationalState.ENTERING_MAINTENANCE);
+      pendingNodes.add(dn);
+      LOG.info("Starting Maintenance for node {}", dn);
+    } else {
+      LOG.info("Starting Maintenance called on node {} with state {}. "+
+          "Nothing to do.", dn, opState);
+    }
+  }
+
+  private NodeStatus getNodeStatus(DatanodeDetails dn)
+      throws NodeNotFoundException {
+    NodeStatus nodeStatus = nodeManager.getNodeStatus(dn);
+    if (nodeStatus == null) {
+      throw new NodeNotFoundException();
+    }
+    return nodeStatus;
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 205f2e1..252c38f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -125,11 +125,19 @@ public interface NodeManager extends StorageContainerNodeProtocol,
   SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails);
 
   /**
-   * Returns the node state of a specific node.
+   * Returns the node status of a specific node.
    * @param datanodeDetails DatanodeDetails
-   * @return Healthy/Stale/Dead.
+   * @return NodeStatus for the node
    */
-  NodeState getNodeState(DatanodeDetails datanodeDetails);
+  NodeStatus getNodeStatus(DatanodeDetails datanodeDetails);
+
+  /**
+   * Set the operation state of a node.
+   * @param datanodeDetails The datanode to set the new state for
+   * @param newState The new operational state for the node
+   */
+  void setNodeOperationalState(DatanodeDetails datanodeDetails,
+      NodeOperationalState newState) throws NodeNotFoundException;
 
   /**
    * Get set of pipelines a datanode is part of.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 1e1a50c..4177b63 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -395,6 +395,21 @@ public class NodeStateManager implements Runnable, Closeable {
   }
 
   /**
+   * Sets the operational state of the given node. Intended to be called when
+   * a node is being decommissioned etc.
+   *
+   * @param dn The datanode having its state set
+   * @param newState The new operational State of the node.
+   */
+  public void setNodeOperationalState(DatanodeDetails dn,
+      NodeOperationalState newState)  throws NodeNotFoundException {
+    DatanodeInfo dni = nodeStateMap.getNodeInfo(dn.getUuid());
+    if (dni.getNodeStatus().getOperationalState() != newState) {
+      nodeStateMap.updateNodeOperationalState(dn.getUuid(), newState);
+    }
+  }
+
+  /**
    * Gets set of pipelineID a datanode belongs to.
    * @param dnId - Datanode ID
    * @return Set of PipelineID
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index e48eda1..ccf1d0e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -213,15 +213,15 @@ public class SCMNodeManager implements NodeManager {
   }
 
   /**
-   * Returns the node state of a specific node.
+   * Returns the node status of a specific node.
    *
    * @param datanodeDetails Datanode Details
-   * @return Healthy/Stale/Dead/Unknown.
+   * @return NodeStatus for the node
    */
   @Override
-  public NodeState getNodeState(DatanodeDetails datanodeDetails) {
+  public NodeStatus getNodeStatus(DatanodeDetails datanodeDetails) {
     try {
-      return nodeStateManager.getNodeStatus(datanodeDetails).getHealth();
+      return nodeStateManager.getNodeStatus(datanodeDetails);
     } catch (NodeNotFoundException e) {
       // TODO: should we throw NodeNotFoundException?
       return null;
@@ -229,6 +229,17 @@ public class SCMNodeManager implements NodeManager {
   }
 
   /**
+   * Set the operation state of a node.
+   * @param datanodeDetails The datanode to set the new state for
+   * @param newState The new operational state for the node
+   */
+  @Override
+  public void setNodeOperationalState(DatanodeDetails datanodeDetails,
+      NodeOperationalState newState) throws NodeNotFoundException{
+    nodeStateManager.setNodeOperationalState(datanodeDetails, newState);
+  }
+
+  /**
    * Closes this stream and releases any system resources associated with it. If
    * the stream is already closed then invoking this method has no effect.
    *
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 0d2f470..53bd95d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -57,6 +57,18 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerLocationProtocolProtos.DecommissionNodesResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerLocationProtocolProtos.RecommissionNodesRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerLocationProtocolProtos.RecommissionNodesResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerLocationProtocolProtos.StartMaintenanceNodesRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerLocationProtocolProtos.StartMaintenanceNodesResponseProto;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
@@ -214,6 +226,27 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
             .setReplicationManagerStatusResponse(getReplicationManagerStatus(
                 request.getSeplicationManagerStatusRequest()))
             .build();
+      case DecommissionNodes:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setDecommissionNodesResponse(decommissionNodes(
+                request.getDecommissionNodesRequest()))
+            .build();
+      case RecommissionNodes:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setRecommissionNodesResponse(recommissionNodes(
+                request.getRecommissionNodesRequest()))
+            .build();
+      case StartMaintenanceNodes:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setStartMaintenanceNodesResponse(startMaintenanceNodes(
+                request.getStartMaintenanceNodesRequest()))
+            .build();
       default:
         throw new IllegalArgumentException(
             "Unknown command type: " + request.getCmdType());
@@ -390,4 +423,25 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
         .setIsRunning(impl.getReplicationManagerStatus()).build();
   }
 
+  public DecommissionNodesResponseProto decommissionNodes(
+      DecommissionNodesRequestProto request) throws IOException {
+    impl.decommissionNodes(request.getHostsList());
+    return DecommissionNodesResponseProto.newBuilder()
+        .build();
+  }
+
+  public RecommissionNodesResponseProto recommissionNodes(
+      RecommissionNodesRequestProto request) throws IOException {
+    impl.recommissionNodes(request.getHostsList());
+    return RecommissionNodesResponseProto.newBuilder().build();
+  }
+
+  public StartMaintenanceNodesResponseProto startMaintenanceNodes(
+      StartMaintenanceNodesRequestProto request) throws IOException {
+    impl.startMaintenanceNodes(request.getHostsList(),
+        (int)request.getEndInHours());
+    return StartMaintenanceNodesResponseProto.newBuilder()
+        .build();
+  }
+
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index d982507..38d846f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -366,6 +366,43 @@ public class SCMClientProtocolServer implements
   }
 
   @Override
+  public void decommissionNodes(List<String> nodes) throws IOException {
+    String remoteUser = getRpcRemoteUsername();
+    try {
+      getScm().checkAdminAccess(remoteUser);
+      scm.getScmDecommissionManager().decommissionNodes(nodes);
+    } catch (Exception ex) {
+      LOG.error("Failed to decommission nodes", ex);
+      throw ex;
+    }
+  }
+
+  @Override
+  public void recommissionNodes(List<String> nodes) throws IOException {
+    String remoteUser = getRpcRemoteUsername();
+    try {
+      getScm().checkAdminAccess(remoteUser);
+      scm.getScmDecommissionManager().recommissionNodes(nodes);
+    } catch (Exception ex) {
+      LOG.error("Failed to recommission nodes", ex);
+      throw ex;
+    }
+  }
+
+  @Override
+  public void startMaintenanceNodes(List<String> nodes, int endInHours)
+      throws IOException {
+    String remoteUser = getRpcRemoteUsername();
+    try {
+      getScm().checkAdminAccess(remoteUser);
+      scm.getScmDecommissionManager().startMaintenanceNodes(nodes, endInHours);
+    } catch (Exception ex) {
+      LOG.error("Failed to place nodes into maintenance mode", ex);
+      throw ex;
+    }
+  }
+
+  @Override
   public void notifyObjectStageChange(StorageContainerLocationProtocolProtos
       .ObjectStageChangeRequestProto.Type type, long id,
       StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto.Op
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 702102b..2cc1cde 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
+import org.apache.hadoop.hdds.scm.node.NodeDecommissionManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler;
@@ -160,6 +161,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   private ContainerManager containerManager;
   private BlockManager scmBlockManager;
   private final SCMStorageConfig scmStorageConfig;
+  private NodeDecommissionManager scmDecommissionManager;
 
   private SCMMetadataStore scmMetadataStore;
 
@@ -335,6 +337,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         clientProtocolServer, scmBlockManager, replicationManager,
         pipelineManager);
 
+    scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
+        pipelineManager, containerManager);
+
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
@@ -928,6 +933,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   }
 
   /**
+   * Returns the node decommission manager.
+   *
+   * @return NodeDecommissionManager The decommission manger for the used by
+   *         scm
+   */
+  public NodeDecommissionManager getScmDecommissionManager() {
+    return scmDecommissionManager;
+  }
+
+  /**
    * Returns SCM container manager.
    */
   @VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 20a8b74..25cf504 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -262,11 +262,20 @@ public class MockNodeManager implements NodeManager {
    * @return Healthy/Stale/Dead.
    */
   @Override
-  public HddsProtos.NodeState getNodeState(DatanodeDetails dd) {
+  public NodeStatus getNodeStatus(DatanodeDetails dd) {
     return null;
   }
 
   /**
+   * Set the operation state of a node.
+   * @param datanodeDetails The datanode to set the new state for
+   * @param newState The new operational state for the node
+   */
+  public void setNodeOperationalState(DatanodeDetails datanodeDetails,
+      HddsProtos.NodeOperationalState newState) throws NodeNotFoundException {
+  }
+
+  /**
    * Get set of pipelines a datanode is part of.
    * @param dnId - datanodeID
    * @return Set of PipelineID
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
new file mode 100644
index 0000000..4492a6e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.Arrays;
+import java.util.ArrayList;
+import static junit.framework.TestCase.assertEquals;
+import static org.assertj.core.api.Fail.fail;
+
+/**
+ * Unit tests for the decommision manager.
+ */
+
+public class TestNodeDecommissionManager {
+
+  private NodeDecommissionManager decom;
+  private StorageContainerManager scm;
+  private NodeManager nodeManager;
+  private OzoneConfiguration conf;
+  private String storageDir;
+
+  @Before
+  public void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    storageDir = GenericTestUtils.getTempPath(
+        TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+    nodeManager = createNodeManager(conf);
+    decom = new NodeDecommissionManager(conf, nodeManager, null, null);
+  }
+
+  @Test
+  public void testHostStringsParseCorrectly()
+      throws InvalidHostStringException {
+    NodeDecommissionManager.HostDefinition def =
+        new NodeDecommissionManager.HostDefinition("foobar");
+    assertEquals("foobar", def.getHostname());
+    assertEquals(-1, def.getPort());
+
+    def = new NodeDecommissionManager.HostDefinition(" foobar ");
+    assertEquals("foobar", def.getHostname());
+    assertEquals(-1, def.getPort());
+
+    def = new NodeDecommissionManager.HostDefinition("foobar:1234");
+    assertEquals("foobar", def.getHostname());
+    assertEquals(1234, def.getPort());
+
+    def = new NodeDecommissionManager.HostDefinition(
+        "foobar.mycompany.com:1234");
+    assertEquals("foobar.mycompany.com", def.getHostname());
+    assertEquals(1234, def.getPort());
+
+    try {
+      def = new NodeDecommissionManager.HostDefinition("foobar:abcd");
+      fail("InvalidHostStringException should have been thrown");
+    } catch (InvalidHostStringException e) {
+    }
+  }
+
+  @Test
+  public void testAnyInvalidHostThrowsException()
+      throws InvalidHostStringException{
+    List<DatanodeDetails> dns = generateDatanodes();
+
+    // Try to decommission a host that does exist, but give incorrect port
+    try {
+      decom.decommissionNodes(Arrays.asList(dns.get(1).getIpAddress()+":10"));
+      fail("InvalidHostStringException expected");
+    } catch (InvalidHostStringException e) {
+    }
+
+    // Try to decommission a host that does not exist
+    try {
+      decom.decommissionNodes(Arrays.asList("123.123.123.123"));
+      fail("InvalidHostStringException expected");
+    } catch (InvalidHostStringException e) {
+    }
+
+    // Try to decommission a host that does exist and a host that does not
+    try {
+      decom.decommissionNodes(Arrays.asList(
+          dns.get(1).getIpAddress(), "123,123,123,123"));
+      fail("InvalidHostStringException expected");
+    } catch (InvalidHostStringException e) {
+    }
+
+    // Try to decommission a host with many DNs on the address with no port
+    try {
+      decom.decommissionNodes(Arrays.asList(
+          dns.get(0).getIpAddress()));
+      fail("InvalidHostStringException expected");
+    } catch (InvalidHostStringException e) {
+    }
+
+    // Try to decommission a host with many DNs on the address with a port
+    // that does not exist
+    try {
+      decom.decommissionNodes(Arrays.asList(
+          dns.get(0).getIpAddress()+":10"));
+      fail("InvalidHostStringException expected");
+    } catch (InvalidHostStringException e) {
+    }
+  }
+
+  @Test
+  public void testNodesCanBeDecommissionedAndRecommissioned()
+      throws InvalidHostStringException {
+    List<DatanodeDetails> dns = generateDatanodes();
+
+    // Decommission 2 valid nodes
+    decom.decommissionNodes(Arrays.asList(dns.get(1).getIpAddress(),
+        dns.get(2).getIpAddress()));
+    assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+        nodeManager.getNodeStatus(dns.get(1)).getOperationalState());
+    assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+        nodeManager.getNodeStatus(dns.get(2)).getOperationalState());
+
+    // Running the command again gives no error - nodes already decommissioning
+    // are silently ignored.
+    decom.decommissionNodes(Arrays.asList(dns.get(1).getIpAddress(),
+        dns.get(2).getIpAddress()));
+
+    // Attempt to decommission dn(10) which has multiple hosts on the same IP
+    // and we hardcoded ports to 3456, 4567, 5678
+    DatanodeDetails multiDn = dns.get(10);
+    String multiAddr =
+        multiDn.getIpAddress()+":"+multiDn.getPorts().get(0).getValue();
+    decom.decommissionNodes(Arrays.asList(multiAddr));
+    assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
+        nodeManager.getNodeStatus(multiDn).getOperationalState());
+
+    // Recommission all 3 hosts
+    decom.recommissionNodes(Arrays.asList(
+        multiAddr, dns.get(1).getIpAddress(), dns.get(2).getIpAddress()));
+    assertEquals(HddsProtos.NodeOperationalState.IN_SERVICE,
+        nodeManager.getNodeStatus(dns.get(1)).getOperationalState());
+    assertEquals(HddsProtos.NodeOperationalState.IN_SERVICE,
+        nodeManager.getNodeStatus(dns.get(2)).getOperationalState());
+    assertEquals(HddsProtos.NodeOperationalState.IN_SERVICE,
+        nodeManager.getNodeStatus(dns.get(10)).getOperationalState());
+  }
+
+  @Test
+  public void testNodesCanBePutIntoMaintenanceAndRecommissioned()
+      throws InvalidHostStringException {
+    List<DatanodeDetails> dns = generateDatanodes();
+
+    // Put 2 valid nodes into maintenance
+    decom.startMaintenanceNodes(Arrays.asList(dns.get(1).getIpAddress(),
+        dns.get(2).getIpAddress()), 100);
+    assertEquals(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
+        nodeManager.getNodeStatus(dns.get(1)).getOperationalState());
+    assertEquals(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
+        nodeManager.getNodeStatus(dns.get(2)).getOperationalState());
+
+    // Running the command again gives no error - nodes already decommissioning
+    // are silently ignored.
+    decom.startMaintenanceNodes(Arrays.asList(dns.get(1).getIpAddress(),
+        dns.get(2).getIpAddress()), 100);
+
+    // Attempt to decommission dn(10) which has multiple hosts on the same IP
+    // and we hardcoded ports to 3456, 4567, 5678
+    DatanodeDetails multiDn = dns.get(10);
+    String multiAddr =
+        multiDn.getIpAddress()+":"+multiDn.getPorts().get(0).getValue();
+    decom.startMaintenanceNodes(Arrays.asList(multiAddr), 100);
+    assertEquals(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
+        nodeManager.getNodeStatus(multiDn).getOperationalState());
+
+    // Recommission all 3 hosts
+    decom.recommissionNodes(Arrays.asList(
+        multiAddr, dns.get(1).getIpAddress(), dns.get(2).getIpAddress()));
+    assertEquals(HddsProtos.NodeOperationalState.IN_SERVICE,
+        nodeManager.getNodeStatus(dns.get(1)).getOperationalState());
+    assertEquals(HddsProtos.NodeOperationalState.IN_SERVICE,
+        nodeManager.getNodeStatus(dns.get(2)).getOperationalState());
+    assertEquals(HddsProtos.NodeOperationalState.IN_SERVICE,
+        nodeManager.getNodeStatus(dns.get(10)).getOperationalState());
+  }
+
+  private SCMNodeManager createNodeManager(OzoneConfiguration config)
+      throws IOException, AuthenticationException {
+    scm = HddsTestUtils.getScm(config);
+    return (SCMNodeManager) scm.getScmNodeManager();
+  }
+
+  /**
+   * Generate a list of random DNs and return the list. A total of 11 DNs will
+   * be generated and registered with the node manager. Index 0 and 10 will
+   * have the same IP and host and the rest will have unique IPs and Hosts.
+   * The DN at index 10, has 3 hard coded ports of 3456, 4567, 5678. All other
+   * DNs will have ports set to 0.
+   * @return The list of DatanodeDetails Generated
+   */
+  private List<DatanodeDetails> generateDatanodes() {
+    List<DatanodeDetails> dns = new ArrayList<>();
+    for (int i=0; i<10; i++) {
+      DatanodeDetails dn = TestUtils.randomDatanodeDetails();
+      dns.add(dn);
+      nodeManager.register(dn, null, null);
+    }
+    // We have 10 random DNs, we want to create another one that is on the same
+    // host as some of the others.
+    DatanodeDetails multiDn = dns.get(0);
+
+    DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
+    builder.setUuid(UUID.randomUUID().toString())
+        .setHostName(multiDn.getHostName())
+        .setIpAddress(multiDn.getIpAddress())
+        .addPort(DatanodeDetails.newPort(
+            DatanodeDetails.Port.Name.STANDALONE, 3456))
+        .addPort(DatanodeDetails.newPort(
+            DatanodeDetails.Port.Name.RATIS, 4567))
+        .addPort(DatanodeDetails.newPort(
+            DatanodeDetails.Port.Name.REST, 5678))
+        .setNetworkLocation(multiDn.getNetworkLocation());
+
+    DatanodeDetails dn = builder.build();
+    nodeManager.register(dn, null, null);
+    dns.add(dn);
+    return dns;
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java
index bc28a43..9fbf251 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
@@ -185,6 +186,22 @@ public class TestNodeStateManager {
         eventPublisher.getLastEvent().getName());
   }
 
+  @Test
+  public void testNodeOpStateCanBeSet()
+      throws NodeAlreadyExistsException, NodeNotFoundException {
+    DatanodeDetails dn = generateDatanode();
+    nsm.addNode(dn);
+
+    nsm.setNodeOperationalState(dn,
+        HddsProtos.NodeOperationalState.DECOMMISSIONED);
+
+    NodeStatus newStatus = nsm.getNodeStatus(dn);
+    assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONED,
+        newStatus.getOperationalState());
+    assertEquals(NodeState.HEALTHY,
+        newStatus.getHealth());
+  }
+
   private DatanodeDetails generateDatanode() {
     String uuid = UUID.randomUUID().toString();
     return DatanodeDetails.newBuilder().setUuid(uuid).build();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index a48b2a0..5a2e3c2 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -52,17 +52,17 @@ import java.util.LinkedList;
  * A Node Manager to test replication.
  */
 public class ReplicationNodeManagerMock implements NodeManager {
-  private final Map<DatanodeDetails, NodeState> nodeStateMap;
+  private final Map<DatanodeDetails, NodeStatus> nodeStateMap;
   private final CommandQueue commandQueue;
 
   /**
    * A list of Datanodes and current states.
-   * @param nodeState A node state map.
+   * @param nodeStatus A node state map.
    */
-  public ReplicationNodeManagerMock(Map<DatanodeDetails, NodeState> nodeState,
+  public ReplicationNodeManagerMock(Map<DatanodeDetails, NodeStatus> nodeStatus,
                                     CommandQueue commandQueue) {
-    Preconditions.checkNotNull(nodeState);
-    this.nodeStateMap = nodeState;
+    Preconditions.checkNotNull(nodeStatus);
+    this.nodeStateMap = nodeStatus;
     this.commandQueue = commandQueue;
   }
 
@@ -179,11 +179,27 @@ public class ReplicationNodeManagerMock implements NodeManager {
    * @return Healthy/Stale/Dead.
    */
   @Override
-  public NodeState getNodeState(DatanodeDetails dd) {
+  public NodeStatus getNodeStatus(DatanodeDetails dd) {
     return nodeStateMap.get(dd);
   }
 
   /**
+   * Set the operation state of a node.
+   * @param dd The datanode to set the new state for
+   * @param newState The new operational state for the node
+   */
+  @Override
+  public void setNodeOperationalState(DatanodeDetails dd,
+      HddsProtos.NodeOperationalState newState) throws NodeNotFoundException {
+    NodeStatus currentStatus = nodeStateMap.get(dd);
+    if (currentStatus != null) {
+      nodeStateMap.put(dd, new NodeStatus(newState, currentStatus.getHealth()));
+    } else {
+      throw new NodeNotFoundException();
+    }
+  }
+
+  /**
    * Get set of pipelines a datanode is part of.
    * @param dnId - datanodeID
    * @return Set of PipelineID
@@ -313,10 +329,10 @@ public class ReplicationNodeManagerMock implements NodeManager {
    * Adds a node to the existing Node manager. This is used only for test
    * purposes.
    * @param id DatanodeDetails
-   * @param state State you want to put that node to.
+   * @param status State you want to put that node to.
    */
-  public void addNode(DatanodeDetails id, NodeState state) {
-    nodeStateMap.put(id, state);
+  public void addNode(DatanodeDetails id, NodeStatus status) {
+    nodeStateMap.put(id, status);
   }
 
   @Override
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
index ff30eca..b095cd1 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.cli.container.ContainerCommands;
 import org.apache.hadoop.hdds.scm.cli.pipeline.PipelineCommands;
+import org.apache.hadoop.hdds.scm.cli.node.DatanodeAdminCommands;
 import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
 import org.apache.hadoop.hdds.scm.client.ScmClient;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -76,7 +77,8 @@ import picocli.CommandLine.Option;
         ContainerCommands.class,
         PipelineCommands.class,
         TopologySubcommand.class,
-        ReplicationManagerCommands.class
+        ReplicationManagerCommands.class,
+        DatanodeAdminCommands.class
     },
     mixinStandardHelpOptions = true)
 public class SCMCLI extends GenericCli {
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminCommands.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminCommands.java
new file mode 100644
index 0000000..dc7a6f4
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminCommands.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.node;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.cli.MissingSubcommandException;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ParentCommand;
+import org.apache.hadoop.hdds.scm.cli.SCMCLI;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Subcommand to group datanode admin related operations.
+ */
+@Command(
+    name = "dnadmin",
+    description = "Datanode Administration specific operations",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class,
+    subcommands = {
+        DatanodeAdminDecommissionSubCommand.class,
+        DatanodeAdminMaintenanceSubCommand.class,
+        DatanodeAdminRecommissionSubCommand.class
+    })
+public class DatanodeAdminCommands implements Callable<Void> {
+
+  @ParentCommand
+  private SCMCLI parent;
+
+  public SCMCLI getParent() {
+    return parent;
+  }
+
+  @Override
+  public Void call() throws Exception {
+    throw new MissingSubcommandException(
+        this.parent.getCmd().getSubcommands().get("nodeadmin"));
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminDecommissionSubCommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminDecommissionSubCommand.java
new file mode 100644
index 0000000..1406603
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminDecommissionSubCommand.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.node;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ParentCommand;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * Decommission one or more datanodes.
+ */
+@Command(
+    name = "decommission",
+    description = "Decommission a datanode",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class DatanodeAdminDecommissionSubCommand implements Callable<Void> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DatanodeAdminDecommissionSubCommand.class);
+
+  @CommandLine.Parameters(description = "List of fully qualified host names")
+  private List<String> hosts = new ArrayList<String>();
+
+  @ParentCommand
+  private DatanodeAdminCommands parent;
+
+  @Override
+  public Void call() throws Exception {
+    try (ScmClient scmClient = parent.getParent().createScmClient()) {
+      scmClient.decommissionNodes(hosts);
+      return null;
+    }
+  }
+}
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminMaintenanceSubCommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminMaintenanceSubCommand.java
new file mode 100644
index 0000000..2e4b2b5
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminMaintenanceSubCommand.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.node;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ParentCommand;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * Place one or more datanodes into Maintenance Mode.
+ */
+@Command(
+    name = "maintenance",
+    description = "Put a datanode into Maintenance Mode",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class DatanodeAdminMaintenanceSubCommand implements Callable<Void> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DatanodeAdminMaintenanceSubCommand.class);
+
+  @CommandLine.Parameters(description = "List of fully qualified host names")
+  private List<String> hosts = new ArrayList<String>();
+
+  @CommandLine.Option(names = {"--end"},
+      description = "Automatically end maintenance after the given hours. "+
+          "By default, maintenance must be ended manually.")
+  private int endInHours = 0;
+
+  @ParentCommand
+  private DatanodeAdminCommands parent;
+
+  @Override
+  public Void call() throws Exception {
+    try (ScmClient scmClient = parent.getParent().createScmClient()) {
+      scmClient.startMaintenanceNodes(hosts, endInHours);
+      return null;
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminRecommissionSubCommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminRecommissionSubCommand.java
new file mode 100644
index 0000000..eaa1280
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/DatanodeAdminRecommissionSubCommand.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.node;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ParentCommand;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * Place decommissioned or maintenance nodes back into service.
+ */
+@Command(
+    name = "recommission",
+    description = "Return a datanode to service",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class DatanodeAdminRecommissionSubCommand implements Callable<Void> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DatanodeAdminRecommissionSubCommand.class);
+
+  @CommandLine.Parameters(description = "List of fully qualified host names")
+  private List<String> hosts = new ArrayList<String>();
+
+  @ParentCommand
+  private DatanodeAdminCommands parent;
+
+  @Override
+  public Void call() throws Exception {
+    try (ScmClient scmClient = parent.getParent().createScmClient()) {
+      scmClient.recommissionNodes(hosts);
+      return null;
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/package-info.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/package-info.java
new file mode 100644
index 0000000..dfb04b8
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/node/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * <p>
+ * SCM related cli tools.
+ */
+/**
+ * SCM related cli tools for Datanode Admin.
+ */
+package org.apache.hadoop.hdds.scm.cli.node;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
new file mode 100644
index 0000000..bada595
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm.node;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static junit.framework.TestCase.assertEquals;
+import static org.apache.hadoop.hdds.HddsConfigKeys.*;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
+
+/**
+ * Test from the scmclient for decommission and maintenance.
+ */
+
+public class TestDecommissionAndMaintenance {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestDecommissionAndMaintenance.class);
+
+  private static int numOfDatanodes = 5;
+  private MiniOzoneCluster cluster;
+
+  private ContainerOperationClient scmClient;
+
+  @Before
+  public void setUp() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    final int interval = 100;
+
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+        interval, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(numOfDatanodes)
+        .build();
+    cluster.waitForClusterToBeReady();
+    scmClient = new ContainerOperationClient(cluster
+        .getStorageContainerLocationClient(),
+        new XceiverClientManager(conf));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testNodeCanBeDecommMaintAndRecommissioned()
+      throws IOException {
+    NodeManager nm = cluster.getStorageContainerManager().getScmNodeManager();
+
+    List<DatanodeDetails> dns = nm.getAllNodes();
+    scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dns.get(0))));
+
+    // Ensure one node is decommissioning
+    List<DatanodeDetails> decomNodes = nm.getNodes(
+        HddsProtos.NodeOperationalState.DECOMMISSIONING,
+        HddsProtos.NodeState.HEALTHY);
+    assertEquals(1, decomNodes.size());
+
+    scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dns.get(0))));
+
+    // Ensure zero nodes are now decommissioning
+    decomNodes = nm.getNodes(
+        HddsProtos.NodeOperationalState.DECOMMISSIONING,
+        HddsProtos.NodeState.HEALTHY);
+    assertEquals(0, decomNodes.size());
+
+    scmClient.startMaintenanceNodes(Arrays.asList(
+        getDNHostAndPort(dns.get(0))), 10);
+
+    // None are decommissioning
+    decomNodes = nm.getNodes(
+        HddsProtos.NodeOperationalState.DECOMMISSIONING,
+        HddsProtos.NodeState.HEALTHY);
+    assertEquals(0, decomNodes.size());
+
+    // One is in Maintenance
+    decomNodes = nm.getNodes(
+        HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
+        HddsProtos.NodeState.HEALTHY);
+    assertEquals(1, decomNodes.size());
+
+    scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dns.get(0))));
+
+    // None are in maintenance
+    decomNodes = nm.getNodes(
+        HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
+        HddsProtos.NodeState.HEALTHY);
+    assertEquals(0, decomNodes.size());
+  }
+
+  private String getDNHostAndPort(DatanodeDetails dn) {
+    return dn.getHostName()+":"+dn.getPorts().get(0).getValue();
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-help@hadoop.apache.org