You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ae...@apache.org on 2019/11/07 18:52:24 UTC

[hadoop-ozone] branch HDDS-1880-Decom updated: HDDS-2329. Destroy pipelines on any decommission or maintenance nodes Signed-off-by: Anu Engineer

This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch HDDS-1880-Decom
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-1880-Decom by this push:
     new 278699e  HDDS-2329. Destroy pipelines on any decommission or maintenance nodes Signed-off-by: Anu Engineer <ae...@apache.org>
278699e is described below

commit 278699e4d112c777d8a9e301b48f8acafabaffb5
Author: S O'Donnell <so...@cloudera.com>
AuthorDate: Fri Oct 18 22:05:01 2019 +0100

    HDDS-2329. Destroy pipelines on any decommission or maintenance nodes
    Signed-off-by: Anu Engineer <ae...@apache.org>
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   5 +
 .../common/src/main/resources/ozone-default.xml    |  11 +
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   6 +
 .../hadoop/hdds/scm/node/DatanodeAdminMonitor.java | 282 +++++++++++++++++++++
 .../scm/node/DatanodeAdminMonitorInterface.java    |  41 +++
 .../hdds/scm/node/DatanodeAdminNodeDetails.java    | 157 ++++++++++++
 .../hdds/scm/node/NodeDecommissionManager.java     |  69 +++--
 .../hdds/scm/node/StartDatanodeAdminHandler.java   |  68 +++++
 .../hdds/scm/server/StorageContainerManager.java   |   7 +-
 .../hdds/scm/node/TestDatanodeAdminMonitor.java    | 163 ++++++++++++
 .../scm/node/TestDatanodeAdminNodeDetails.java     | 168 ++++++++++++
 .../hdds/scm/node/TestNodeDecommissionManager.java |   2 +-
 12 files changed, 961 insertions(+), 18 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1617806..a27d44e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -366,6 +366,11 @@ public final class ScmConfigKeys {
   public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled";
   public static final boolean HDDS_TRACING_ENABLED_DEFAULT = true;
 
+  public static final String OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL =
+      "ozone.scm.datanode.admin.monitor.interval";
+  public static final String OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT =
+      "30s";
+
   /**
    * Never constructed.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index b0a59fa..62878a4 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2501,4 +2501,15 @@
       The number of Recon Tasks that are waiting on updates from OM.
     </description>
   </property>
+  <property>
+    <name>ozone.scm.datanode.admin.monitor.interval</name>
+    <value>30s</value>
+    <tag>SCM</tag>
+    <description>
+      This sets how frequently the datanode admin monitor runs to check for
+      nodes added to the admin workflow or removed from it. The progress
+      of decommissioning and entering maintenance nodes is also checked to see
+      if they have completed.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 43d396e..97e998c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -170,6 +170,12 @@ public final class SCMEvents {
       new TypedEvent<>(DatanodeDetails.class, "Dead_Node");
 
   /**
+   * This event will be triggered whenever a datanode is moved into maintenance.
+   */
+  public static final TypedEvent<DatanodeDetails> START_ADMIN_ON_NODE =
+      new TypedEvent<>(DatanodeDetails.class, "START_ADMIN_ON_NODE");
+
+  /**
    * This event will be triggered whenever a datanode is moved from non-healthy
    * state to healthy state.
    */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java
new file mode 100644
index 0000000..3f35217
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Monitor thread which watches for nodes to be decommissioned, recommissioned
+ * or placed into maintenance. Newly added nodes are queued in pendingNodes
+ * and recommissoned nodes are queued in cancelled nodes. On each monitor
+ * 'tick', the cancelled nodes are processed and removed from the monitor.
+ * Then any pending nodes are added to the trackedNodes set, where they stay
+ * until decommission or maintenance has ended.
+ *
+ * Once an node is placed into tracked nodes, it goes through a workflow where
+ * the following happens:
+ *
+ * 1. First an event is fired to close any pipelines on the node, which will
+ *    also close any containers.
+ * 2. Next the containers on the node are obtained and checked to see if new
+ *    replicas are needed. If so, the new replicas are scheduled.
+ * 3. After scheduling replication, the node remains pending until replication
+ *    has completed.
+ * 4. At this stage the node will complete decommission or enter maintenance.
+ * 5. Maintenance nodes will remain tracked by this monitor until maintenance
+ *    is manually ended, or the maintenance window expires.
+ */
+public class DatanodeAdminMonitor implements DatanodeAdminMonitorInterface {
+
+  private OzoneConfiguration conf;
+  private EventPublisher eventQueue;
+  private NodeManager nodeManager;
+  private PipelineManager pipelineManager;
+  private Queue<DatanodeAdminNodeDetails> pendingNodes = new ArrayDeque();
+  private Queue<DatanodeAdminNodeDetails> cancelledNodes = new ArrayDeque();
+  private Set<DatanodeAdminNodeDetails> trackedNodes = new HashSet<>();
+  private StateMachine<States, Transitions> workflowSM;
+
+  /**
+   * States that a node must pass through when being decommissioned or placed
+   * into maintenance.
+   */
+  public enum States {
+    CLOSE_PIPELINES(1),
+    GET_CONTAINERS(2),
+    REPLICATE_CONTAINERS(3),
+    AWAIT_MAINTENANCE_END(4),
+    COMPLETE(5);
+
+    private int sequenceNumber;
+
+    States(int sequenceNumber) {
+      this.sequenceNumber = sequenceNumber;
+    }
+
+    public int getSequenceNumber() {
+      return sequenceNumber;
+    }
+  }
+
+  /**
+   * Transition events that occur to move a node from one state to the next.
+   */
+  public enum Transitions {
+    COMPLETE_DECOM_STAGE, COMPLETE_MAINT_STAGE, UNEXPECTED_NODE_STATE
+  }
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DatanodeAdminMonitor.class);
+
+  public DatanodeAdminMonitor(OzoneConfiguration config) {
+    conf = config;
+    initializeStateMachine();
+  }
+
+  @Override
+  public void setConf(OzoneConfiguration config) {
+    conf = config;
+  }
+
+  @Override
+  public void setEventQueue(EventPublisher eventQueue) {
+    this.eventQueue = eventQueue;
+  }
+
+  @Override
+  public void setNodeManager(NodeManager nm) {
+    nodeManager = nm;
+  }
+
+  @Override
+  public void setPipelineManager(PipelineManager pm) {
+    pipelineManager = pm;
+  }
+
+  /**
+   * Add a node to the decommission or maintenance workflow. The node will be
+   * queued and added to the workflow after a defined interval.
+   *
+   * @param dn The datanode to move into an admin state
+   * @param endInHours For nodes going into maintenance, the number of hours
+   *                   from now for maintenance to automatically end. Ignored
+   *                   for decommissioning nodes.
+   */
+  @Override
+  public synchronized void startMonitoring(DatanodeDetails dn, int endInHours) {
+    DatanodeAdminNodeDetails nodeDetails =
+        new DatanodeAdminNodeDetails(dn, workflowSM.getInitialState(),
+            endInHours);
+    cancelledNodes.remove(nodeDetails);
+    pendingNodes.add(nodeDetails);
+  }
+
+  /**
+   * Remove a node from the decommission or maintenance workflow, and return it
+   * to service. The node will be queued and removed from decommission or
+   * maintenance after a defined interval.
+   * @param dn The datanode for which to stop decommission or maintenance.
+   */
+  @Override
+  public synchronized void stopMonitoring(DatanodeDetails dn) {
+    DatanodeAdminNodeDetails nodeDetails = new DatanodeAdminNodeDetails(dn,
+        workflowSM.getInitialState(), 0);
+    pendingNodes.remove(nodeDetails);
+    cancelledNodes.add(nodeDetails);
+  }
+
+  /**
+   * Run an iteration of the monitor. This is the main run loop, and performs
+   * the following checks:
+   *
+   * 1. Check for any cancelled nodes and process them
+   * 2. Check for any newly added nodes and add them to the workflow
+   * 3. Wait for any nodes which have completed closing pipelines
+   */
+  @Override
+  public void run() {
+    try {
+      synchronized (this) {
+        processCancelledNodes();
+        processPendingNodes();
+      }
+      checkPipelinesClosed();
+      if (trackedNodes.size() > 0 || pendingNodes.size() > 0) {
+        LOG.info("There are {} nodes tracked for decommission and "+
+            "maintenance. {} pending nodes.",
+            trackedNodes.size(), pendingNodes.size());
+      }
+    } catch (Exception e) {
+      LOG.error("Caught an error in the DatanodeAdminMonitor", e);
+    }
+  }
+
+  @Override
+  public int getPendingCount() {
+    return pendingNodes.size();
+  }
+
+  @Override
+  public int getCancelledCount() {
+    return cancelledNodes.size();
+  }
+
+  @Override
+  public int getTrackedNodeCount() {
+    return trackedNodes.size();
+  }
+
+  @VisibleForTesting
+  public Set<DatanodeAdminNodeDetails> getTrackedNodes() {
+    return trackedNodes;
+  }
+
+  /**
+   * Return the state machine used to transition a node through the admin
+   * workflow.
+   * @return The StateMachine used by the admin workflow
+   */
+  @VisibleForTesting
+  public StateMachine<States, Transitions> getWorkflowStateMachine() {
+    return workflowSM;
+  }
+
+  private void processCancelledNodes() {
+    while(!cancelledNodes.isEmpty()) {
+      DatanodeAdminNodeDetails dn = cancelledNodes.poll();
+      trackedNodes.remove(dn);
+      // TODO - fire event to bring node back into service?
+    }
+  }
+
+  private void processPendingNodes() {
+    while(!pendingNodes.isEmpty()) {
+      DatanodeAdminNodeDetails dn = pendingNodes.poll();
+      // Trigger event to async close the node pipelines.
+      eventQueue.fireEvent(SCMEvents.START_ADMIN_ON_NODE,
+          dn.getDatanodeDetails());
+      trackedNodes.add(dn);
+    }
+  }
+
+  private void checkPipelinesClosed() {
+    for (DatanodeAdminNodeDetails dn : trackedNodes) {
+      if (dn.getCurrentState() != States.CLOSE_PIPELINES) {
+        continue;
+      }
+      DatanodeDetails dnd = dn.getDatanodeDetails();
+      Set<PipelineID> pipelines = nodeManager.getPipelines(dnd);
+      if (pipelines == null || pipelines.size() == 0) {
+        NodeStatus nodeStatus = nodeManager.getNodeStatus(dnd);
+        try {
+          dn.transitionState(workflowSM, nodeStatus.getOperationalState());
+        } catch (InvalidStateTransitionException e) {
+          LOG.warn("Unexpected state transition", e);
+          // TODO - how to handle this? This means the node is not in
+          //        an expected state, eg it is IN_SERVICE when it should be
+          //        decommissioning, so should we abort decom altogether for it?
+          //        This could happen if a node is queued for cancel and not yet
+          //        processed.
+        }
+      } else {
+        LOG.info("Waiting for pipelines to close for {}. There are {} "+
+            "pipelines", dnd, pipelines.size());
+      }
+    }
+  }
+
+  /**
+   * Setup the state machine with the allowed transitions for a node to move
+   * through the maintenance workflow.
+   */
+  private void initializeStateMachine() {
+    Set<States> finalStates = new HashSet<>();
+    workflowSM = new StateMachine<>(States.CLOSE_PIPELINES, finalStates);
+    workflowSM.addTransition(States.CLOSE_PIPELINES,
+        States.GET_CONTAINERS, Transitions.COMPLETE_DECOM_STAGE);
+    workflowSM.addTransition(States.GET_CONTAINERS, States.REPLICATE_CONTAINERS,
+        Transitions.COMPLETE_DECOM_STAGE);
+    workflowSM.addTransition(States.REPLICATE_CONTAINERS, States.COMPLETE,
+        Transitions.COMPLETE_DECOM_STAGE);
+
+    workflowSM.addTransition(States.CLOSE_PIPELINES,
+        States.GET_CONTAINERS, Transitions.COMPLETE_MAINT_STAGE);
+    workflowSM.addTransition(States.GET_CONTAINERS, States.REPLICATE_CONTAINERS,
+        Transitions.COMPLETE_MAINT_STAGE);
+    workflowSM.addTransition(States.REPLICATE_CONTAINERS,
+        States.AWAIT_MAINTENANCE_END, Transitions.COMPLETE_MAINT_STAGE);
+    workflowSM.addTransition(States.AWAIT_MAINTENANCE_END,
+        States.COMPLETE, Transitions.COMPLETE_MAINT_STAGE);
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorInterface.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorInterface.java
new file mode 100644
index 0000000..d15162b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorInterface.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
+/**
+ * Interface used by the DatanodeAdminMonitor, which can be used to
+ * decommission or recommission nodes and take them in and out of maintenance.
+ */
+public interface DatanodeAdminMonitorInterface extends Runnable {
+
+  void setConf(OzoneConfiguration conf);
+  void setEventQueue(EventPublisher scm);
+  void setNodeManager(NodeManager nm);
+  void setPipelineManager(PipelineManager pm);
+  void startMonitoring(DatanodeDetails dn, int endInHours);
+  void stopMonitoring(DatanodeDetails dn);
+
+  int getPendingCount();
+  int getCancelledCount();
+  int getTrackedNodeCount();
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java
new file mode 100644
index 0000000..a0607e9
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used by the DatanodeAdminMonitor to track the state and
+ * details for Datanode decommission and maintenance. It provides a wrapper
+ * around a DatanodeDetails object adding some additional states and helper
+ * methods related to the admin workflow.
+ */
+public class DatanodeAdminNodeDetails {
+  private DatanodeDetails datanodeDetails;
+  private long maintenanceEndTime;
+  private DatanodeAdminMonitor.States currentState;
+  private long enteredStateAt = 0;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DatanodeAdminNodeDetails.class);
+
+
+  /**
+   * Create a new object given the DatanodeDetails and the maintenance endtime.
+   * @param dn The datanode going through the admin workflow
+   * @param maintenanceEnd The number of hours from 'now', when maintenance
+   *                       should end automatically. Passing zero indicates
+   *                       indicates maintenance will never end automatically.
+   */
+  DatanodeAdminNodeDetails(DatanodeDetails dn,
+      DatanodeAdminMonitor.States initialState, long maintenanceEnd) {
+    datanodeDetails = dn;
+    setMaintenanceEnd(maintenanceEnd);
+    currentState = initialState;
+    enteredStateAt = System.currentTimeMillis();
+  }
+
+  public boolean shouldMaintenanceEnd() {
+    if (0 == maintenanceEndTime) {
+      return false;
+    }
+    return System.currentTimeMillis() >= maintenanceEndTime;
+  }
+
+  public DatanodeDetails getDatanodeDetails() {
+    return datanodeDetails;
+  }
+
+  /**
+   * Get the current admin workflow state for this node.
+   * @return The current Admin workflow state for this node
+   */
+  public DatanodeAdminMonitor.States getCurrentState() {
+    return currentState;
+  }
+
+  /**
+   * Set the number of hours after which maintenance should end. Passing zero
+   * indicates maintenance will never end automatically. It is possible to pass
+   * a negative number of hours can be passed for testing purposes.
+   * @param hoursFromNow The number of hours from now when maintenance should
+   *                     end, or zero for it to never end.
+   */
+  @VisibleForTesting
+  public void setMaintenanceEnd(long hoursFromNow) {
+    if (0 == hoursFromNow) {
+      maintenanceEndTime = 0;
+      return;
+    }
+    // Convert hours to ms
+    long msFromNow = hoursFromNow * 60L * 60L * 1000L;
+    maintenanceEndTime = System.currentTimeMillis() + msFromNow;
+  }
+
+  /**
+   * Given the workflow stateMachine and the current node status
+   * (DECOMMISSIONING or ENTERING_MAINTENANCE) move the node to the next
+   * admin workflow state.
+   * @param sm The stateMachine which controls the state flow
+   * @param nodeOperationalState The current operational state for the node, eg
+   *                             decommissioning or entering_maintenance
+   * @return
+   * @throws InvalidStateTransitionException
+   */
+  public DatanodeAdminMonitor.States transitionState(
+      StateMachine<DatanodeAdminMonitor.States,
+          DatanodeAdminMonitor.Transitions> sm,
+      NodeOperationalState nodeOperationalState)
+      throws InvalidStateTransitionException {
+
+    DatanodeAdminMonitor.States newState = sm.getNextState(currentState,
+        getTransition(nodeOperationalState));
+    long currentTime = System.currentTimeMillis();
+    LOG.info("Datanode {} moved from admin workflow state {} to {} after {} "+
+        "seconds", datanodeDetails, currentState, newState,
+        (currentTime - enteredStateAt)/1000L);
+    currentState = newState;
+    enteredStateAt = currentTime;
+    return currentState;
+  }
+
+  private DatanodeAdminMonitor.Transitions getTransition(
+      NodeOperationalState nodeState) {
+    if (nodeState == NodeOperationalState.DECOMMISSIONED ||
+        nodeState == NodeOperationalState.DECOMMISSIONING) {
+      return DatanodeAdminMonitor.Transitions.COMPLETE_DECOM_STAGE;
+    } else if (nodeState ==
+        NodeOperationalState.ENTERING_MAINTENANCE ||
+        nodeState == NodeOperationalState.IN_MAINTENANCE) {
+      return DatanodeAdminMonitor.Transitions.COMPLETE_MAINT_STAGE;
+    } else {
+      return DatanodeAdminMonitor.Transitions.UNEXPECTED_NODE_STATE;
+    }
+  }
+
+  /**
+   * Matches only on the DatanodeDetails field, which compares only the UUID
+   * of the node to determine of they are the same object or not.
+   *
+   * @param o The object to compare this with
+   * @return True if the object match, otherwise false
+   *
+   */
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof DatanodeAdminNodeDetails &&
+        datanodeDetails.equals(
+            ((DatanodeAdminNodeDetails) o).getDatanodeDetails());
+  }
+
+  @Override
+  public int hashCode() {
+    return datanodeDetails.hashCode();
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
index 9806fbb..fbc5981 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -16,14 +16,16 @@
  */
 package org.apache.hadoop.hdds.scm.node;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,23 +35,28 @@ import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Class used to manage datanodes scheduled for maintenance or decommission.
  */
 public class NodeDecommissionManager {
 
+  private ScheduledExecutorService executor;
+  private DatanodeAdminMonitorInterface monitor;
+
   private NodeManager nodeManager;
- // private PipelineManager pipeLineManager;
+  private PipelineManager pipelineManager;
  // private ContainerManager containerManager;
- // private OzoneConfiguration conf;
+  private EventPublisher eventQueue;
+  private OzoneConfiguration conf;
   private boolean useHostnames;
-
-  private List<DatanodeDetails> pendingNodes = new LinkedList<>();
+  private long monitorInterval;
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(DatanodeAdminManager.class);
-
+      LoggerFactory.getLogger(NodeDecommissionManager.class);
 
   static class HostDefinition {
     private String rawHostname;
@@ -157,17 +164,47 @@ public class NodeDecommissionManager {
     return false;
   }
 
-  public NodeDecommissionManager(OzoneConfiguration conf,
-      NodeManager nodeManager, PipelineManager pipelineManager,
-      ContainerManager containerManager) {
-    this.nodeManager = nodeManager;
-    //this.conf = conf;
-    //this.pipeLineManager = pipelineManager;
+  public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm,
+      PipelineManager pm, ContainerManager containerManager,
+      EventPublisher eventQueue) {
+    this.nodeManager = nm;
+    conf = config;
+    this.pipelineManager = pm;
     //this.containerManager = containerManager;
+    this.eventQueue = eventQueue;
+
+    executor = Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setNameFormat("DatanodeAdminManager-%d")
+            .setDaemon(true).build());
 
     useHostnames = conf.getBoolean(
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+
+    monitorInterval = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
+        ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT,
+        TimeUnit.SECONDS);
+    if (monitorInterval <= 0) {
+      LOG.warn("{} must be greater than zero, defaulting to {}",
+          ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
+          ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT);
+      conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
+          ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT);
+      monitorInterval = conf.getTimeDuration(
+          ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL,
+          ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT,
+          TimeUnit.SECONDS);
+    }
+
+    monitor = new DatanodeAdminMonitor(conf);
+    monitor.setConf(conf);
+    monitor.setEventQueue(this.eventQueue);
+    monitor.setNodeManager(nodeManager);
+    monitor.setPipelineManager(pipelineManager);
+
+    executor.scheduleAtFixedRate(monitor, monitorInterval, monitorInterval,
+        TimeUnit.SECONDS);
   }
 
   public synchronized void decommissionNodes(List nodes)
@@ -201,7 +238,7 @@ public class NodeDecommissionManager {
       LOG.info("Starting Decommission for node {}", dn);
       nodeManager.setNodeOperationalState(
           dn, NodeOperationalState.DECOMMISSIONING);
-      pendingNodes.add(dn);
+      monitor.startMonitoring(dn, 0);
     } else if (opState == NodeOperationalState.DECOMMISSIONING
         || opState == NodeOperationalState.DECOMMISSIONED) {
       LOG.info("Start Decommission called on node {} in state {}. Nothing to "+
@@ -238,7 +275,7 @@ public class NodeDecommissionManager {
     if (opState != NodeOperationalState.IN_SERVICE) {
       nodeManager.setNodeOperationalState(
           dn, NodeOperationalState.IN_SERVICE);
-      pendingNodes.remove(dn);
+      monitor.stopMonitoring(dn);
       LOG.info("Recommissioned node {}", dn);
     } else {
       LOG.info("Recommission called on node {} with state {}. "+
@@ -278,7 +315,7 @@ public class NodeDecommissionManager {
     if (opState == NodeOperationalState.IN_SERVICE) {
       nodeManager.setNodeOperationalState(
           dn, NodeOperationalState.ENTERING_MAINTENANCE);
-      pendingNodes.add(dn);
+      monitor.startMonitoring(dn, endInHours);
       LOG.info("Starting Maintenance for node {}", dn);
     } else if (opState == NodeOperationalState.ENTERING_MAINTENANCE ||
         opState == NodeOperationalState.IN_MAINTENANCE) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java
new file mode 100644
index 0000000..9418a7a
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ * Handler which is fired when a datanode starts admin (decommission or
+ * maintenance).
+ */
+public class StartDatanodeAdminHandler
+    implements EventHandler<DatanodeDetails> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StartDatanodeAdminHandler.class);
+
+  private final NodeManager nodeManager;
+  private final PipelineManager pipelineManager;
+
+  public StartDatanodeAdminHandler(NodeManager nodeManager,
+      PipelineManager pipelineManager) {
+    this.nodeManager = nodeManager;
+    this.pipelineManager = pipelineManager;
+  }
+
+  @Override
+  public void onMessage(DatanodeDetails datanodeDetails,
+                        EventPublisher publisher) {
+    Set<PipelineID> pipelineIds =
+        nodeManager.getPipelines(datanodeDetails);
+    LOG.info("Admin start on datanode {}. Finalizing its pipelines {}",
+        datanodeDetails, pipelineIds);
+    for (PipelineID pipelineID : pipelineIds) {
+      try {
+        Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
+        pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+      } catch (IOException e) {
+        LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
+            datanodeDetails);
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 24572c7..db0f7cf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreRDBImpl;
 import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
 import org.apache.hadoop.hdds.scm.node.NewNodeHandler;
+import org.apache.hadoop.hdds.scm.node.StartDatanodeAdminHandler;
 import org.apache.hadoop.hdds.scm.node.NonHealthyToHealthyNodeHandler;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
@@ -296,6 +297,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
         pipelineManager, containerManager);
+    StartDatanodeAdminHandler datanodeStartAdminHandler =
+        new StartDatanodeAdminHandler(scmNodeManager, pipelineManager);
     NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler =
         new NonHealthyToHealthyNodeHandler(pipelineManager, conf);
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
@@ -338,7 +341,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         pipelineManager);
 
     scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
-        pipelineManager, containerManager);
+        pipelineManager, containerManager, eventQueue);
 
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
@@ -353,6 +356,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE,
         nonHealthyToHealthyNodeHandler);
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
+    eventQueue.addHandler(SCMEvents.START_ADMIN_ON_NODE,
+        datanodeStartAdminHandler);
     eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
     eventQueue
         .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
new file mode 100644
index 0000000..0aa0221
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import static junit.framework.TestCase.assertEquals;
+
+/**
+ * Tests to ensure the DatanodeAdminMonitor is working correctly.
+ */
+public class TestDatanodeAdminMonitor {
+
+  private StorageContainerManager scm;
+  private NodeManager nodeManager;
+  private ContainerManager containerManager;
+  private SCMPipelineManager pipelineManager;
+  private OzoneConfiguration conf;
+  private DatanodeAdminMonitor monitor;
+  private DatanodeDetails datanode1;
+  private DatanodeDetails datanode2;
+  private DatanodeDetails datanode3;
+
+  @Before
+  public void setup() throws IOException, AuthenticationException {
+    // This creates a mocked cluster of 6 nodes, where there are mock pipelines
+    // etc. Borrows heavily from TestDeadNodeHandler.
+    conf = new OzoneConfiguration();
+    String storageDir = GenericTestUtils.getTempPath(
+        TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+
+    scm = HddsTestUtils.getScm(conf);
+    nodeManager = scm.getScmNodeManager();
+    pipelineManager = (SCMPipelineManager)scm.getPipelineManager();
+    containerManager = scm.getContainerManager();
+
+    monitor = new DatanodeAdminMonitor(conf);
+    monitor.setEventQueue(scm.getEventQueue());
+    monitor.setNodeManager(nodeManager);
+    monitor.setPipelineManager(pipelineManager);
+
+    PipelineProvider mockRatisProvider =
+        new MockRatisPipelineProvider(nodeManager,
+            pipelineManager.getStateManager(), conf);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        mockRatisProvider);
+
+    datanode1 = TestUtils.randomDatanodeDetails();
+    datanode2 = TestUtils.randomDatanodeDetails();
+    datanode3 = TestUtils.randomDatanodeDetails();
+
+    String storagePath = GenericTestUtils.getRandomizedTempPath()
+        .concat("/" + datanode1.getUuidString());
+
+    StorageContainerDatanodeProtocolProtos.StorageReportProto
+        storageOne = TestUtils.createStorageReport(
+        datanode1.getUuid(), storagePath, 100, 10, 90, null);
+
+    nodeManager.register(datanode1,
+        TestUtils.createNodeReport(storageOne), null);
+    nodeManager.register(datanode2,
+        TestUtils.createNodeReport(storageOne), null);
+    nodeManager.register(datanode3,
+        TestUtils.createNodeReport(storageOne), null);
+    nodeManager.register(TestUtils.randomDatanodeDetails(),
+        TestUtils.createNodeReport(storageOne), null);
+    nodeManager.register(TestUtils.randomDatanodeDetails(),
+        TestUtils.createNodeReport(storageOne), null);
+    nodeManager.register(TestUtils.randomDatanodeDetails(),
+        TestUtils.createNodeReport(storageOne), null);
+  }
+
+  @After
+  public void teardown() {
+  }
+
+  @Test
+  public void testNodeCanBeQueuedAndCancelled() {
+    DatanodeDetails dn = TestUtils.randomDatanodeDetails();
+    monitor.startMonitoring(dn, 0);
+    assertEquals(1, monitor.getPendingCount());
+
+    monitor.stopMonitoring(dn);
+    assertEquals(0, monitor.getPendingCount());
+    assertEquals(1, monitor.getCancelledCount());
+
+    monitor.startMonitoring(dn, 0);
+    assertEquals(1, monitor.getPendingCount());
+    assertEquals(0, monitor.getCancelledCount());
+
+  }
+
+  @Test
+  public void testMonitoredNodeHasPipelinesClosed()
+      throws NodeNotFoundException, TimeoutException, InterruptedException {
+
+    GenericTestUtils.waitFor(() -> nodeManager
+        .getPipelines(datanode1).size() == 2, 100, 20000);
+
+    nodeManager.setNodeOperationalState(datanode1,
+        HddsProtos.NodeOperationalState.DECOMMISSIONING);
+    monitor.startMonitoring(datanode1, 0);
+    monitor.run();
+    // Ensure the node moves from pending to tracked
+    assertEquals(0, monitor.getPendingCount());
+    assertEquals(1, monitor.getTrackedNodeCount());
+
+    // Ensure the pipelines are closed, as this is the first step in the admin
+    // workflow
+    GenericTestUtils.waitFor(() -> nodeManager
+        .getPipelines(datanode1).size() == 0, 100, 20000);
+
+    // Run the run loop again and ensure the tracked node is moved to the next
+    // state
+    monitor.run();
+    for (DatanodeAdminNodeDetails node : monitor.getTrackedNodes()) {
+      assertEquals(
+          DatanodeAdminMonitor.States.GET_CONTAINERS, node.getCurrentState());
+    }
+    // Finally, cancel decommission and see the node is removed from tracking
+    monitor.stopMonitoring(datanode1);
+    monitor.run();
+    assertEquals(0, monitor.getTrackedNodeCount());
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminNodeDetails.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminNodeDetails.java
new file mode 100644
index 0000000..3b5177e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminNodeDetails.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
+import
+    org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static junit.framework.TestCase.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Tests to validate the DatanodeAdminNodeDetails class.
+ */
+public class TestDatanodeAdminNodeDetails {
+
+  private OzoneConfiguration conf;
+  private DatanodeAdminMonitor monitor;
+  private final DatanodeAdminMonitor.States initialState =
+      DatanodeAdminMonitor.States.CLOSE_PIPELINES;
+
+  @Before
+  public void setup() {
+    conf = new OzoneConfiguration();
+    monitor = new DatanodeAdminMonitor(conf);
+  }
+
+  @After
+  public void teardown() {
+  }
+
+  @Test
+  public void testEqualityBasedOnDatanodeDetails() {
+    DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
+    DatanodeDetails dn2 = TestUtils.randomDatanodeDetails();
+    DatanodeAdminNodeDetails details1 =
+        new DatanodeAdminNodeDetails(dn1, initialState, 0);
+    DatanodeAdminNodeDetails details2 =
+        new DatanodeAdminNodeDetails(dn2, initialState, 0);
+
+    assertNotEquals(details1, details2);
+    assertEquals(details1,
+        new DatanodeAdminNodeDetails(dn1, initialState, 0));
+    assertNotEquals(details1, dn1);
+  }
+
+  @Test
+  public void testUnexpectedNodeStateGivesBadTransition() {
+    DatanodeDetails dn = TestUtils.randomDatanodeDetails();
+    DatanodeAdminNodeDetails details =
+        new DatanodeAdminNodeDetails(dn, initialState, 0);
+
+    try {
+      details.transitionState(monitor.getWorkflowStateMachine(),
+          NodeOperationalState.IN_SERVICE);
+      fail("InvalidStateTransitionException should be thrown");
+    } catch (InvalidStateTransitionException e) {
+
+    }
+  }
+
+  @Test
+  public void testWorkflowStatesTransitionCorrectlyForDecom()
+      throws InvalidStateTransitionException {
+    DatanodeDetails dn = TestUtils.randomDatanodeDetails();
+    DatanodeAdminNodeDetails details = new DatanodeAdminNodeDetails(dn,
+        initialState, 0);
+
+    // Initial state should be CLOSE_PIPELINES
+    assertEquals(DatanodeAdminMonitor.States.CLOSE_PIPELINES,
+        details.getCurrentState());
+
+    // Next State is GET_CONTAINERS
+    details.transitionState(monitor.getWorkflowStateMachine(),
+        NodeOperationalState.DECOMMISSIONING);
+    assertEquals(DatanodeAdminMonitor.States.GET_CONTAINERS,
+        details.getCurrentState());
+
+    // Next State is REPLICATE_CONTAINERS
+    details.transitionState(monitor.getWorkflowStateMachine(),
+        NodeOperationalState.DECOMMISSIONING);
+    assertEquals(DatanodeAdminMonitor.States.REPLICATE_CONTAINERS,
+        details.getCurrentState());
+
+    // Next State is COMPLETE
+    details.transitionState(monitor.getWorkflowStateMachine(),
+        NodeOperationalState.DECOMMISSIONING);
+    assertEquals(DatanodeAdminMonitor.States.COMPLETE,
+        details.getCurrentState());
+  }
+
+  @Test
+  public void testWorkflowStatesTransitionCorrectlyForMaint()
+      throws InvalidStateTransitionException {
+    DatanodeDetails dn = TestUtils.randomDatanodeDetails();
+    DatanodeAdminNodeDetails details = new DatanodeAdminNodeDetails(dn,
+        initialState, 0);
+
+    // Initial state should be CLOSE_PIPELINES
+    assertEquals(DatanodeAdminMonitor.States.CLOSE_PIPELINES,
+        details.getCurrentState());
+
+    // Next State is GET_CONTAINERS
+    details.transitionState(monitor.getWorkflowStateMachine(),
+        NodeOperationalState.ENTERING_MAINTENANCE);
+    assertEquals(DatanodeAdminMonitor.States.GET_CONTAINERS,
+        details.getCurrentState());
+
+    // Next State is REPLICATE_CONTAINER
+    details.transitionState(monitor.getWorkflowStateMachine(),
+        NodeOperationalState.ENTERING_MAINTENANCE);
+    assertEquals(DatanodeAdminMonitor.States.REPLICATE_CONTAINERS,
+        details.getCurrentState());
+
+    // Next State is AWAIT_MAINTENANCE_END
+    details.transitionState(monitor.getWorkflowStateMachine(),
+        NodeOperationalState.ENTERING_MAINTENANCE);
+    assertEquals(DatanodeAdminMonitor.States.AWAIT_MAINTENANCE_END,
+        details.getCurrentState());
+
+    // Next State is COMPLETE
+    details.transitionState(monitor.getWorkflowStateMachine(),
+        NodeOperationalState.ENTERING_MAINTENANCE);
+    assertEquals(DatanodeAdminMonitor.States.COMPLETE,
+        details.getCurrentState());
+  }
+
+  @Test
+  public void testMaintenanceEnd() {
+    DatanodeDetails dn = TestUtils.randomDatanodeDetails();
+    // End in zero hours - should never end.
+    DatanodeAdminNodeDetails details = new DatanodeAdminNodeDetails(dn,
+        initialState, 0);
+    assertFalse(details.shouldMaintenanceEnd());
+
+    // End 1 hour - maintenance should not end yet.
+    details.setMaintenanceEnd(1);
+    assertFalse(details.shouldMaintenanceEnd());
+
+    // End 1 hour ago - maintenance should end.
+    details.setMaintenanceEnd(-1);
+    assertTrue(details.shouldMaintenanceEnd());
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
index 82bd26b..e3b9208 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
@@ -55,7 +55,7 @@ public class TestNodeDecommissionManager {
         TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
     nodeManager = createNodeManager(conf);
-    decom = new NodeDecommissionManager(conf, nodeManager, null, null);
+    decom = new NodeDecommissionManager(conf, nodeManager, null, null, null);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org