You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2020/03/31 21:58:49 UTC

[hadoop-ozone] branch HDDS-1880-Decom updated: HDDS-2592. Add Datanode command to allow the datanode to persist its admin state (#521)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-1880-Decom
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-1880-Decom by this push:
     new d902586  HDDS-2592. Add Datanode command to allow the datanode to persist its admin state (#521)
d902586 is described below

commit d902586b7ca39083ba66ea19778b537212bc42d3
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Tue Mar 31 22:58:40 2020 +0100

    HDDS-2592. Add Datanode command to allow the datanode to persist its admin state (#521)
---
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  98 ++++++++++++-
 hadoop-hdds/common/src/main/proto/hdds.proto       |   2 +
 .../container/common/helpers/DatanodeIdYaml.java   |  35 +++++
 .../common/statemachine/DatanodeStateMachine.java  |   2 +
 .../SetNodeOperationalStateCommandHandler.java     | 157 +++++++++++++++++++++
 .../states/endpoint/HeartbeatEndpointTask.java     |  12 ++
 .../commands/SetNodeOperationalStateCommand.java   |  89 ++++++++++++
 .../proto/StorageContainerDatanodeProtocol.proto   |   8 ++
 .../hdds/scm/node/DatanodeAdminMonitorImpl.java    |   3 +-
 .../hdds/scm/node/DatanodeAdminNodeDetails.java    |   8 ++
 .../hdds/scm/node/NodeDecommissionManager.java     |   8 +-
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  12 ++
 .../hadoop/hdds/scm/node/NodeStateManager.java     |  26 +++-
 .../apache/hadoop/hdds/scm/node/NodeStatus.java    |  32 ++++-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  81 ++++++++++-
 .../hadoop/hdds/scm/node/states/NodeStateMap.java  |   5 +-
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |   8 ++
 .../hadoop/hdds/scm/container/MockNodeManager.java |  10 ++
 .../hdds/scm/container/SimpleMockNodeManager.java  |  10 +-
 .../hdds/scm/node/TestNodeDecommissionManager.java |   5 +
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  30 ++++
 .../hdds/scm/node/states/TestNodeStateMap.java     |   5 +-
 .../testutils/ReplicationNodeManagerMock.java      |  15 +-
 23 files changed, 643 insertions(+), 18 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 698a443..a2584d8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -49,6 +49,8 @@ public class DatanodeDetails extends NodeImpl implements
   private String hostName;
   private List<Port> ports;
   private String certSerialId;
+  private HddsProtos.NodeOperationalState persistedOpState;
+  private long persistedOpStateExpiryEpochSec = 0;
 
   /**
    * Constructs DatanodeDetails instance. DatanodeDetails.Builder is used
@@ -59,15 +61,23 @@ public class DatanodeDetails extends NodeImpl implements
    * @param networkLocation DataNode's network location path
    * @param ports Ports used by the DataNode
    * @param certSerialId serial id from SCM issued certificate.
+   * @param persistedOpState Operational State stored on DN.
+   * @param persistedOpStateExpiryEpochSec Seconds after the epoch the stored
+   *                                       state should expire.
    */
+  @SuppressWarnings("checkstyle:ParameterNumber")
   private DatanodeDetails(String uuid, String ipAddress, String hostName,
-      String networkLocation, List<Port> ports, String certSerialId) {
+      String networkLocation, List<Port> ports, String certSerialId,
+      HddsProtos.NodeOperationalState persistedOpState,
+      long persistedOpStateExpiryEpochSec) {
     super(hostName, networkLocation, NetConstants.NODE_COST_DEFAULT);
     this.uuid = UUID.fromString(uuid);
     this.ipAddress = ipAddress;
     this.hostName = hostName;
     this.ports = ports;
     this.certSerialId = certSerialId;
+    this.persistedOpState = persistedOpState;
+    this.persistedOpStateExpiryEpochSec = persistedOpStateExpiryEpochSec;
   }
 
   protected DatanodeDetails(DatanodeDetails datanodeDetails) {
@@ -78,6 +88,9 @@ public class DatanodeDetails extends NodeImpl implements
     this.hostName = datanodeDetails.hostName;
     this.ports = datanodeDetails.ports;
     this.setNetworkName(datanodeDetails.getNetworkName());
+    this.persistedOpState = datanodeDetails.getPersistedOpState();
+    this.persistedOpStateExpiryEpochSec =
+        datanodeDetails.getPersistedOpStateExpiryEpochSec();
   }
 
   /**
@@ -156,6 +169,46 @@ public class DatanodeDetails extends NodeImpl implements
   }
 
   /**
+   * Return the persistedOpState. If the stored value is null, return the
+   * default value of IN_SERVICE.
+   *
+   * @return The OperationalState persisted on the datanode.
+   */
+  public HddsProtos.NodeOperationalState getPersistedOpState() {
+    if (persistedOpState == null) {
+      return HddsProtos.NodeOperationalState.IN_SERVICE;
+    } else {
+      return persistedOpState;
+    }
+  }
+
+  /**
+   * Set the persistedOpState for this instance.
+   *
+   * @param state The new operational state.
+   */
+  public void setPersistedOpState(HddsProtos.NodeOperationalState state) {
+    this.persistedOpState = state;
+  }
+
+  /**
+   * Get the persistedOpStateExpiryEpochSec for the instance.
+   * @return Seconds from the epoch when the operational state should expire.
+   */
+  public long getPersistedOpStateExpiryEpochSec() {
+    return persistedOpStateExpiryEpochSec;
+  }
+
+  /**
+   * Set persistedOpStateExpiryEpochSec.
+   * @param expiry The number of second after the epoch the operational state
+   *               should expire.
+   */
+  public void setPersistedOpStateExpiryEpochSec(long expiry) {
+    this.persistedOpStateExpiryEpochSec = expiry;
+  }
+
+  /**
    * Given the name returns port number, null if the asked port is not found.
    *
    * @param name Name of the port
@@ -200,6 +253,13 @@ public class DatanodeDetails extends NodeImpl implements
     if (datanodeDetailsProto.hasNetworkLocation()) {
       builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
     }
+    if (datanodeDetailsProto.hasPersistedOpState()) {
+      builder.setPersistedOpState(datanodeDetailsProto.getPersistedOpState());
+    }
+    if (datanodeDetailsProto.hasPersistedOpStateExpiry()) {
+      builder.setPersistedOpStateExpiry(
+          datanodeDetailsProto.getPersistedOpStateExpiry());
+    }
     return builder.build();
   }
 
@@ -226,6 +286,10 @@ public class DatanodeDetails extends NodeImpl implements
     if (!Strings.isNullOrEmpty(getNetworkLocation())) {
       builder.setNetworkLocation(getNetworkLocation());
     }
+    if (persistedOpState != null) {
+      builder.setPersistedOpState(persistedOpState);
+    }
+    builder.setPersistedOpStateExpiry(persistedOpStateExpiryEpochSec);
 
     for (Port port : ports) {
       builder.addPorts(HddsProtos.Port.newBuilder()
@@ -246,6 +310,8 @@ public class DatanodeDetails extends NodeImpl implements
         ", networkLocation: " +
         getNetworkLocation() +
         ", certSerialId: " + certSerialId +
+        ", persistedOpState: " + persistedOpState +
+        ", persistedOpStateExpiryEpochSec: " + persistedOpStateExpiryEpochSec +
         "}";
   }
 
@@ -285,6 +351,8 @@ public class DatanodeDetails extends NodeImpl implements
     private String networkLocation;
     private List<Port> ports;
     private String certSerialId;
+    private HddsProtos.NodeOperationalState persistedOpState;
+    private long persistedOpStateExpiryEpochSec = 0;
 
     /**
      * Default private constructor. To create Builder instance use
@@ -374,6 +442,31 @@ public class DatanodeDetails extends NodeImpl implements
     }
 
     /**
+     * Adds persistedOpState.
+     *
+     * @param state The operational state persisted on the datanode
+     *
+     * @return DatanodeDetails.Builder
+     */
+    public Builder setPersistedOpState(HddsProtos.NodeOperationalState state) {
+      this.persistedOpState = state;
+      return this;
+    }
+
+    /**
+     * Adds persistedOpStateExpiryEpochSec.
+     *
+     * @param expiry The seconds after the epoch the operational state should
+     *              expire.
+     *
+     * @return DatanodeDetails.Builder
+     */
+    public Builder setPersistedOpStateExpiry(long expiry) {
+      this.persistedOpStateExpiryEpochSec = expiry;
+      return this;
+    }
+
+    /**
      * Builds and returns DatanodeDetails instance.
      *
      * @return DatanodeDetails
@@ -384,7 +477,8 @@ public class DatanodeDetails extends NodeImpl implements
         networkLocation = NetConstants.DEFAULT_RACK;
       }
       DatanodeDetails dn = new DatanodeDetails(id, ipAddress, hostName,
-          networkLocation, ports, certSerialId);
+          networkLocation, ports, certSerialId, persistedOpState,
+          persistedOpStateExpiryEpochSec);
       if (networkName != null) {
         dn.setNetworkName(networkName);
       }
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index ad45d30..f629560 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -37,6 +37,8 @@ message DatanodeDetailsProto {
     // network name, can be Ip address or host name, depends
     optional string networkName = 6;
     optional string networkLocation = 7; // Network topology location
+    optional NodeOperationalState persistedOpState = 8; // The Operational state persisted in the datanode.id file
+    optional int64 persistedOpStateExpiry = 9; // The seconds after the epoch when the OpState should expire
 }
 
 /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java
index d3efa98..d016569 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.yaml.snakeyaml.DumperOptions;
 import org.yaml.snakeyaml.Yaml;
 
@@ -82,6 +83,12 @@ public final class DatanodeIdYaml {
           .setIpAddress(datanodeDetailsYaml.getIpAddress())
           .setHostName(datanodeDetailsYaml.getHostName())
           .setCertSerialId(datanodeDetailsYaml.getCertSerialId());
+      if (datanodeDetailsYaml.getPersistedOpState() != null) {
+        builder.setPersistedOpState(HddsProtos.NodeOperationalState.valueOf(
+            datanodeDetailsYaml.getPersistedOpState()));
+      }
+      builder.setPersistedOpStateExpiry(
+          datanodeDetailsYaml.getPersistedOpStateExpiryEpochSec());
 
       if (!MapUtils.isEmpty(datanodeDetailsYaml.getPortDetails())) {
         for (Map.Entry<String, Integer> portEntry :
@@ -105,6 +112,8 @@ public final class DatanodeIdYaml {
     private String ipAddress;
     private String hostName;
     private String certSerialId;
+    private String persistedOpState;
+    private long persistedOpStateExpiryEpochSec = 0;
     private Map<String, Integer> portDetails;
 
     public DatanodeDetailsYaml() {
@@ -113,11 +122,15 @@ public final class DatanodeIdYaml {
 
     private DatanodeDetailsYaml(String uuid, String ipAddress,
                                 String hostName, String certSerialId,
+                                String persistedOpState,
+                                long persistedOpStateExpiryEpochSec,
                                 Map<String, Integer> portDetails) {
       this.uuid = uuid;
       this.ipAddress = ipAddress;
       this.hostName = hostName;
       this.certSerialId = certSerialId;
+      this.persistedOpState = persistedOpState;
+      this.persistedOpStateExpiryEpochSec = persistedOpStateExpiryEpochSec;
       this.portDetails = portDetails;
     }
 
@@ -137,6 +150,14 @@ public final class DatanodeIdYaml {
       return certSerialId;
     }
 
+    public String getPersistedOpState() {
+      return persistedOpState;
+    }
+
+    public long getPersistedOpStateExpiryEpochSec() {
+      return persistedOpStateExpiryEpochSec;
+    }
+
     public Map<String, Integer> getPortDetails() {
       return portDetails;
     }
@@ -157,6 +178,14 @@ public final class DatanodeIdYaml {
       this.certSerialId = certSerialId;
     }
 
+    public void setPersistedOpState(String persistedOpState) {
+      this.persistedOpState = persistedOpState;
+    }
+
+    public void setPersistedOpStateExpiryEpochSec(long opStateExpiryEpochSec) {
+      this.persistedOpStateExpiryEpochSec = opStateExpiryEpochSec;
+    }
+
     public void setPortDetails(Map<String, Integer> portDetails) {
       this.portDetails = portDetails;
     }
@@ -172,11 +201,17 @@ public final class DatanodeIdYaml {
       }
     }
 
+    String persistedOpString = null;
+    if (datanodeDetails.getPersistedOpState() != null) {
+      persistedOpString = datanodeDetails.getPersistedOpState().name();
+    }
     return new DatanodeDetailsYaml(
         datanodeDetails.getUuid().toString(),
         datanodeDetails.getIpAddress(),
         datanodeDetails.getHostName(),
         datanodeDetails.getCertSerialId(),
+        persistedOpString,
+        datanodeDetails.getPersistedOpStateExpiryEpochSec(),
         portDetails);
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 5229ae8..6835ef6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .ReplicateContainerCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
@@ -138,6 +139,7 @@ public class DatanodeStateMachine implements Closeable {
             dnConf.getContainerDeleteThreads()))
         .addHandler(new ClosePipelineCommandHandler())
         .addHandler(new CreatePipelineCommandHandler(conf))
+        .addHandler(new SetNodeOperationalStateCommandHandler(conf))
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
new file mode 100644
index 0000000..1fcf5a2
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Handle the SetNodeOperationalStateCommand sent from SCM to the datanode
+ * to persist the current operational state.
+ */
+public class SetNodeOperationalStateCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SetNodeOperationalStateCommandHandler.class);
+  private final Configuration conf;
+  private final AtomicInteger invocationCount = new AtomicInteger(0);
+  private final AtomicLong totalTime = new AtomicLong(0);
+
+  /**
+   * Set Node State command handler.
+   *
+   * @param conf - Configuration for the datanode.
+   */
+  public SetNodeOperationalStateCommandHandler(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command - SCM Command
+   * @param container - Ozone Container.
+   * @param context - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer container,
+      StateContext context, SCMConnectionManager connectionManager) {
+    long startTime = Time.monotonicNow();
+    invocationCount.incrementAndGet();
+    StorageContainerDatanodeProtocolProtos.SetNodeOperationalStateCommandProto
+        setNodeCmdProto = null;
+
+    if (command.getType() != Type.setNodeOperationalStateCommand) {
+      LOG.warn("Skipping handling command, expected command "
+              + "type {} but found {}",
+          Type.setNodeOperationalStateCommand, command.getType());
+      return;
+    }
+    SetNodeOperationalStateCommand setNodeCmd =
+        (SetNodeOperationalStateCommand) command;
+    setNodeCmdProto = setNodeCmd.getProto();
+    DatanodeDetails dni = context.getParent().getDatanodeDetails();
+    dni.setPersistedOpState(setNodeCmdProto.getNodeOperationalState());
+    dni.setPersistedOpStateExpiryEpochSec(
+        setNodeCmd.getStateExpiryEpochSeconds());
+    try {
+      persistDatanodeDetails(dni);
+    } catch (IOException ioe) {
+      LOG.error("Failed to persist the datanode state", ioe);
+      // TODO - this should probably be raised, but it will break the command
+      //      handler interface.
+    }
+    totalTime.addAndGet(Time.monotonicNow() - startTime);
+  }
+
+  // TODO - this duplicates code in HddsDatanodeService and InitDatanodeState
+  //        Need to refactor.
+  private void persistDatanodeDetails(DatanodeDetails dnDetails)
+      throws IOException {
+    String idFilePath = HddsServerUtil.getDatanodeIdFilePath(conf);
+    if (idFilePath == null || idFilePath.isEmpty()) {
+      LOG.error("A valid path is needed for config setting {}",
+          ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR);
+      throw new IllegalArgumentException(
+          ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR +
+              " must be defined. See" +
+              " https://wiki.apache.org/hadoop/Ozone#Configuration" +
+              " for details on configuring Ozone.");
+    }
+
+    Preconditions.checkNotNull(idFilePath);
+    File idFile = new File(idFilePath);
+    ContainerUtils.writeDatanodeDetailsTo(dnDetails, idFile);
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+      getCommandType() {
+    return Type.setNodeOperationalStateCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return invocationCount.intValue();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    final int invocations = invocationCount.get();
+    return invocations == 0 ?
+        0 : totalTime.get() / invocations;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index a55d0d6..fa66a57 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 
+import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -331,6 +332,17 @@ public class HeartbeatEndpointTask
         }
         this.context.addCommand(closePipelineCommand);
         break;
+      case setNodeOperationalStateCommand:
+        SetNodeOperationalStateCommand setNodeOperationalStateCommand =
+            SetNodeOperationalStateCommand.getFromProtobuf(
+                commandResponseProto.getSetNodeOperationalStateCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM set operational state command. State: {} " +
+              "Expiry: {}", setNodeOperationalStateCommand.getOpState(),
+              setNodeOperationalStateCommand.getStateExpiryEpochSeconds());
+        }
+        this.context.addCommand(setNodeOperationalStateCommand);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCommandType().name());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeOperationalStateCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeOperationalStateCommand.java
new file mode 100644
index 0000000..3ff7949
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeOperationalStateCommand.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SetNodeOperationalStateCommandProto;
+
+/**
+ * A command used to persist the current node operational state on the datanode.
+ */
+public class SetNodeOperationalStateCommand
+    extends SCMCommand<SetNodeOperationalStateCommandProto> {
+
+  private final HddsProtos.NodeOperationalState opState;
+  private long stateExpiryEpochSeconds;
+
+  /**
+   * Ctor that creates a SetNodeOperationalStateCommand.
+   *
+   * @param id    - Command ID. Something a time stamp would suffice.
+   * @param state - OperationalState that want the node to be set into.
+   * @param stateExpiryEpochSeconds The epoch time when the state should
+   *                                expire, or zero for the state to remain
+   *                                indefinitely.
+   */
+  public SetNodeOperationalStateCommand(long id,
+      HddsProtos.NodeOperationalState state, long stateExpiryEpochSeconds) {
+    super(id);
+    this.opState = state;
+    this.stateExpiryEpochSeconds = stateExpiryEpochSeconds;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type  - This is setNodeOperationalStateCommand.
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.setNodeOperationalStateCommand;
+  }
+
+  /**
+   * Gets the protobuf message of this object.
+   *
+   * @return A protobuf message.
+   */
+  @Override
+  public SetNodeOperationalStateCommandProto getProto() {
+    return SetNodeOperationalStateCommandProto.newBuilder()
+        .setCmdId(getId())
+        .setNodeOperationalState(opState)
+        .setStateExpiryEpochSeconds(stateExpiryEpochSeconds).build();
+  }
+
+  public HddsProtos.NodeOperationalState getOpState() {
+    return opState;
+  }
+
+  public long getStateExpiryEpochSeconds() {
+    return stateExpiryEpochSeconds;
+  }
+
+  public static SetNodeOperationalStateCommand getFromProtobuf(
+      SetNodeOperationalStateCommandProto cmdProto) {
+    Preconditions.checkNotNull(cmdProto);
+    return new SetNodeOperationalStateCommand(cmdProto.getCmdId(),
+        cmdProto.getNodeOperationalState(),
+        cmdProto.getStateExpiryEpochSeconds());
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 171ea86..4a977e1 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -287,6 +287,7 @@ message SCMCommandProto {
     replicateContainerCommand = 5;
     createPipelineCommand = 6;
     closePipelineCommand = 7;
+    setNodeOperationalStateCommand = 8;
   }
   // TODO: once we start using protoc 3.x, refactor this message using "oneof"
   required Type commandType = 1;
@@ -297,6 +298,7 @@ message SCMCommandProto {
   optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
   optional CreatePipelineCommandProto createPipelineCommandProto = 7;
   optional ClosePipelineCommandProto closePipelineCommandProto = 8;
+  optional SetNodeOperationalStateCommandProto setNodeOperationalStateCommandProto = 9;
 }
 
 /**
@@ -384,6 +386,12 @@ message ClosePipelineCommandProto {
   required int64 cmdId = 2;
 }
 
+message SetNodeOperationalStateCommandProto {
+  required  int64 cmdId = 1;
+  required  NodeOperationalState nodeOperationalState = 2;
+  required  int64 stateExpiryEpochSeconds = 3;
+}
+
 /**
  * Protocol used from a datanode to StorageContainerManager.
  *
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
index 4d2d895..a1d47fd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
@@ -357,7 +357,8 @@ public class DatanodeAdminMonitorImpl implements DatanodeAdminMonitor {
 
   private void setNodeOpState(DatanodeAdminNodeDetails dn,
       HddsProtos.NodeOperationalState state) throws NodeNotFoundException {
-    nodeManager.setNodeOperationalState(dn.getDatanodeDetails(), state);
+    nodeManager.setNodeOperationalState(dn.getDatanodeDetails(), state,
+        dn.getMaintenanceEnd() / 1000);
   }
 
   private NodeStatus getNodeStatus(DatanodeDetails dnd)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java
index c23fcd2..9c8a905 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java
@@ -107,6 +107,14 @@ public class DatanodeAdminNodeDetails {
   }
 
   /**
+   * Returns the maintenance end time as milli seconds from the epoch.
+   * @return The maintenance end time, or zero if no end time is set.
+   */
+  public long getMaintenanceEnd() {
+    return maintenanceEndTime;
+  }
+
+  /**
    * Matches only on the DatanodeDetails field, which compares only the UUID
    * of the node to determine of they are the same object or not.
    *
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
index 06fe270..3258fef 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -314,9 +314,15 @@ public class NodeDecommissionManager {
       throws NodeNotFoundException, InvalidNodeStateException {
     NodeStatus nodeStatus = getNodeStatus(dn);
     NodeOperationalState opState = nodeStatus.getOperationalState();
+
+    long maintenanceEnd = 0;
+    if (endInHours != 0) {
+      maintenanceEnd =
+          (System.currentTimeMillis() / 1000L) + (endInHours * 60L * 60L);
+    }
     if (opState == NodeOperationalState.IN_SERVICE) {
       nodeManager.setNodeOperationalState(
-          dn, NodeOperationalState.ENTERING_MAINTENANCE);
+          dn, NodeOperationalState.ENTERING_MAINTENANCE, maintenanceEnd);
       monitor.startMonitoring(dn, endInHours);
       LOG.info("Starting Maintenance for node {}", dn);
     } else if (nodeStatus.isMaintenance()) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index b595d00..44ab581 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -142,6 +142,18 @@ public interface NodeManager extends StorageContainerNodeProtocol,
       NodeOperationalState newState) throws NodeNotFoundException;
 
   /**
+   * Set the operation state of a node.
+   * @param datanodeDetails The datanode to set the new state for
+   * @param newState The new operational state for the node
+   * @param opStateExpiryEpocSec Seconds from the epoch when the operational
+   *                             state should end. Zero indicates the state
+   *                             never end.
+   */
+  void setNodeOperationalState(DatanodeDetails datanodeDetails,
+       NodeOperationalState newState,
+       long opStateExpiryEpocSec) throws NodeNotFoundException;
+
+  /**
    * Get set of pipelines a datanode is part of.
    * @param datanodeDetails DatanodeDetails
    * @return Set of PipelineID
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index b16e79c..f18ac60 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -358,10 +358,28 @@ public class NodeStateManager implements Runnable, Closeable {
    */
   public void setNodeOperationalState(DatanodeDetails dn,
       NodeOperationalState newState)  throws NodeNotFoundException {
+    setNodeOperationalState(dn, newState, 0);
+  }
+
+  /**
+   * Sets the operational state of the given node. Intended to be called when
+   * a node is being decommissioned etc.
+   *
+   * @param dn The datanode having its state set
+   * @param newState The new operational State of the node.
+   * @param stateExpiryEpochSec The number of seconds from the epoch when the
+   *                            operational state should expire. Passing zero
+   *                            indicates the state will never expire
+   */
+  public void setNodeOperationalState(DatanodeDetails dn,
+      NodeOperationalState newState,
+      long stateExpiryEpochSec)  throws NodeNotFoundException {
     DatanodeInfo dni = nodeStateMap.getNodeInfo(dn.getUuid());
     NodeStatus oldStatus = dni.getNodeStatus();
-    if (oldStatus.getOperationalState() != newState) {
-      nodeStateMap.updateNodeOperationalState(dn.getUuid(), newState);
+    if (oldStatus.getOperationalState() != newState ||
+        oldStatus.getOpStateExpiryEpochSeconds() != stateExpiryEpochSec) {
+      nodeStateMap.updateNodeOperationalState(
+          dn.getUuid(), newState, stateExpiryEpochSec);
       // This will trigger an event based on the nodes health when the
       // operational state changes. Eg a node that was IN_MAINTENANCE goes
       // to IN_SERVICE + HEALTHY. This will trigger the HEALTHY node event to
@@ -370,7 +388,9 @@ public class NodeStateManager implements Runnable, Closeable {
       // container replicas. Sometimes the event will do nothing, but it will
       // not do any harm either. Eg DECOMMISSIONING -> DECOMMISSIONED + HEALTHY
       // but the pipeline creation logic will ignore decommissioning nodes.
-      fireHealthStateEvent(oldStatus.getHealth(), dn);
+      if (oldStatus.getOperationalState() != newState) {
+        fireHealthStateEvent(oldStatus.getHealth(), dn);
+      }
     }
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
index b8a92e5..72ca015 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
@@ -25,17 +25,28 @@ import java.util.Objects;
 /**
  * This class is used to capture the current status of a datanode. This
  * includes its health (healthy, stale or dead) and its operation status (
- * in_service, decommissioned and maintenance mode.
+ * in_service, decommissioned and maintenance mode) along with the expiry time
+ * for the operational state (used with maintenance mode).
  */
 public class NodeStatus {
 
   private HddsProtos.NodeOperationalState operationalState;
   private HddsProtos.NodeState health;
+  private long opStateExpiryEpochSeconds;
 
   public NodeStatus(HddsProtos.NodeOperationalState operationalState,
              HddsProtos.NodeState health) {
     this.operationalState = operationalState;
     this.health = health;
+    this.opStateExpiryEpochSeconds = 0;
+  }
+
+  public NodeStatus(HddsProtos.NodeOperationalState operationalState,
+                    HddsProtos.NodeState health,
+                    long opStateExpireEpocSeconds) {
+    this.operationalState = operationalState;
+    this.health = health;
+    this.opStateExpiryEpochSeconds = opStateExpireEpocSeconds;
   }
 
   public static NodeStatus inServiceHealthy() {
@@ -61,6 +72,17 @@ public class NodeStatus {
     return operationalState;
   }
 
+  public long getOpStateExpiryEpochSeconds() {
+    return opStateExpiryEpochSeconds;
+  }
+
+  public boolean operationalStateExpired() {
+    if (0 == opStateExpiryEpochSeconds) {
+      return false;
+    }
+    return System.currentTimeMillis() / 1000 >= opStateExpiryEpochSeconds;
+  }
+
   /**
    * Returns true if the nodeStatus indicates the node is in any decommission
    * state.
@@ -163,7 +185,8 @@ public class NodeStatus {
     }
     NodeStatus other = (NodeStatus) obj;
     if (this.operationalState == other.operationalState &&
-        this.health == other.health) {
+        this.health == other.health
+        && this.opStateExpiryEpochSeconds == other.opStateExpiryEpochSeconds) {
       return true;
     }
     return false;
@@ -171,12 +194,13 @@ public class NodeStatus {
 
   @Override
   public int hashCode() {
-    return Objects.hash(health, operationalState);
+    return Objects.hash(health, operationalState, opStateExpiryEpochSeconds);
   }
 
   @Override
   public String toString() {
-    return "OperationalState: "+operationalState+" Health: "+health;
+    return "OperationalState: "+operationalState+" Health: "+health+
+        " OperastionStateExpiry: "+opStateExpiryEpochSeconds;
   }
 
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index d89dac1..3f17e32 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
@@ -64,11 +65,13 @@ import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -231,7 +234,23 @@ public class SCMNodeManager implements NodeManager {
   @Override
   public void setNodeOperationalState(DatanodeDetails datanodeDetails,
       NodeOperationalState newState) throws NodeNotFoundException{
-    nodeStateManager.setNodeOperationalState(datanodeDetails, newState);
+    setNodeOperationalState(datanodeDetails, newState, 0);
+  }
+
+  /**
+   * Set the operation state of a node.
+   * @param datanodeDetails The datanode to set the new state for
+   * @param newState The new operational state for the node
+   * @param opStateExpiryEpocSec Seconds from the epoch when the operational
+   *                             state should end. Zero indicates the state
+   *                             never end.
+   */
+  @Override
+  public void setNodeOperationalState(DatanodeDetails datanodeDetails,
+      NodeOperationalState newState, long opStateExpiryEpocSec)
+      throws NodeNotFoundException{
+    nodeStateManager.setNodeOperationalState(
+        datanodeDetails, newState, opStateExpiryEpocSec);
   }
 
   /**
@@ -312,6 +331,7 @@ public class SCMNodeManager implements NodeManager {
             datanodeDetails.toString());
       }
     }
+    registerInitialDatanodeOpState(datanodeDetails);
 
     return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success)
         .setDatanode(datanodeDetails)
@@ -320,6 +340,29 @@ public class SCMNodeManager implements NodeManager {
   }
 
   /**
+   * When a node registers with SCM, the operational state stored on the
+   * datanode is the source of truth. Therefore, if the datanode reports
+   * anything other than IN_SERVICE on registration, the state in SCM should be
+   * updated to reflect the datanode state.
+   * @param dn
+   */
+  private void registerInitialDatanodeOpState(DatanodeDetails dn) {
+    try {
+      HddsProtos.NodeOperationalState dnOpState = dn.getPersistedOpState();
+      if (dnOpState != NodeOperationalState.IN_SERVICE) {
+        LOG.info("Updating nodeOperationalState on registration as the " +
+            "datanode has a persisted state of {} and expiry of {}",
+            dnOpState, dn.getPersistedOpStateExpiryEpochSec());
+        setNodeOperationalState(dn, dnOpState,
+            dn.getPersistedOpStateExpiryEpochSec());
+      }
+    } catch (NodeNotFoundException e) {
+      LOG.error("Unable to find the node when setting the operational state",
+          e);
+    }
+  }
+
+  /**
    * Add an entry to the dnsToUuidMap, which maps hostname / IP to the DNs
    * running on that host. As each address can have many DNs running on it,
    * this is a one to many mapping.
@@ -352,6 +395,7 @@ public class SCMNodeManager implements NodeManager {
     try {
       nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
       metrics.incNumHBProcessed();
+      updateDatanodeOpState(datanodeDetails);
     } catch (NodeNotFoundException e) {
       metrics.incNumHBProcessingFailed();
       LOG.error("SCM trying to process heartbeat from an " +
@@ -360,6 +404,41 @@ public class SCMNodeManager implements NodeManager {
     return commandQueue.getCommand(datanodeDetails.getUuid());
   }
 
+  /**
+   * If the operational state or expiry reported in the datanode heartbeat do
+   * not match those store in SCM, queue a command to update the state persisted
+   * on the datanode. Additionally, ensure the datanodeDetails stored in SCM
+   * match those reported in the heartbeat.
+   * This method should only be called when processing the
+   * heartbeat, and for a registered node, the information stored in SCM is the
+   * source of truth.
+   * @param reportedDn The DatanodeDetails taken from the node heartbeat.
+   * @throws NodeNotFoundException
+   */
+  private void updateDatanodeOpState(DatanodeDetails reportedDn)
+      throws NodeNotFoundException {
+    NodeStatus scmStatus = getNodeStatus(reportedDn);
+    if (scmStatus.getOperationalState() != reportedDn.getPersistedOpState()
+        || scmStatus.getOpStateExpiryEpochSeconds()
+        != reportedDn.getPersistedOpStateExpiryEpochSec()) {
+      LOG.info("Scheduling a command to update the operationalState " +
+          "persisted on the datanode as the reported value ({}, {}) does not " +
+          "match the value stored in SCM ({}, {})",
+          reportedDn.getPersistedOpState(),
+          reportedDn.getPersistedOpStateExpiryEpochSec(),
+          scmStatus.getOperationalState(),
+          scmStatus.getOpStateExpiryEpochSeconds());
+      commandQueue.addCommand(reportedDn.getUuid(),
+          new SetNodeOperationalStateCommand(
+              Time.monotonicNow(), scmStatus.getOperationalState(),
+              scmStatus.getOpStateExpiryEpochSeconds()));
+    }
+    DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn);
+    scmDnd.setPersistedOpStateExpiryEpochSec(
+        reportedDn.getPersistedOpStateExpiryEpochSec());
+    scmDnd.setPersistedOpState(reportedDn.getPersistedOpState());
+  }
+
   @Override
   public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) {
     try {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
index 6565e81..3cf232a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
@@ -113,13 +113,14 @@ public class NodeStateMap {
    * @throws NodeNotFoundException if the node is not present
    */
   public NodeStatus updateNodeOperationalState(UUID nodeId,
-      NodeOperationalState newOpState) throws NodeNotFoundException {
+      NodeOperationalState newOpState, long opStateExpiryEpochSeconds)
+      throws NodeNotFoundException {
     try {
       lock.writeLock().lock();
       DatanodeInfo dn = getNodeInfo(nodeId);
       NodeStatus oldStatus = dn.getNodeStatus();
       NodeStatus newStatus = new NodeStatus(
-          newOpState, oldStatus.getHealth());
+          newOpState, oldStatus.getHealth(), opStateExpiryEpochSeconds);
       dn.setNodeStatus(newStatus);
       return newStatus;
     } finally {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index d569044..98a25d2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
 import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
@@ -87,6 +88,7 @@ import static org.apache.hadoop.hdds.protocol.proto
 import static org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
     .closePipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.setNodeOperationalStateCommand;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
@@ -350,6 +352,12 @@ public class SCMDatanodeProtocolServer implements
           .setClosePipelineCommandProto(
               ((ClosePipelineCommand)cmd).getProto())
           .build();
+    case setNodeOperationalStateCommand:
+      return builder
+          .setCommandType(setNodeOperationalStateCommand)
+          .setSetNodeOperationalStateCommandProto(
+              ((SetNodeOperationalStateCommand)cmd).getProto())
+          .build();
     default:
       throw new IllegalArgumentException("Scm command " +
           cmd.getType().toString() + " is not implemented");
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index d24cbfd..da04b71 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -279,6 +279,16 @@ public class MockNodeManager implements NodeManager {
   }
 
   /**
+   * Set the operation state of a node.
+   * @param datanodeDetails The datanode to set the new state for
+   * @param newState The new operational state for the node
+   */
+  public void setNodeOperationalState(DatanodeDetails datanodeDetails,
+      HddsProtos.NodeOperationalState newState, long opStateExpiryEpocSec)
+      throws NodeNotFoundException {
+  }
+
+  /**
    * Get set of pipelines a datanode is part of.
    * @param dnId - datanodeID
    * @return Set of PipelineID
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index dad3448..6982d03 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -100,12 +100,20 @@ public class SimpleMockNodeManager implements NodeManager {
   @Override
   public void setNodeOperationalState(DatanodeDetails dn,
       HddsProtos.NodeOperationalState newState) throws NodeNotFoundException {
+    setNodeOperationalState(dn, newState, 0);
+  }
+
+  @Override
+  public void setNodeOperationalState(DatanodeDetails dn,
+      HddsProtos.NodeOperationalState newState, long opStateExpiryEpocSec)
+      throws NodeNotFoundException {
     DatanodeInfo dni = nodeMap.get(dn.getUuid());
     if (dni == null) {
       throw new NodeNotFoundException();
     }
     dni.setNodeStatus(
-        new NodeStatus(newState, dni.getNodeStatus().getHealth()));
+        new NodeStatus(
+            newState, dni.getNodeStatus().getHealth(), opStateExpiryEpocSec));
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
index df62438..a8bcf54 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
@@ -36,6 +36,7 @@ import java.util.Arrays;
 import java.util.ArrayList;
 import static junit.framework.TestCase.assertEquals;
 import static org.assertj.core.api.Fail.fail;
+import static org.junit.Assert.assertNotEquals;
 
 /**
  * Unit tests for the decommision manager.
@@ -182,8 +183,12 @@ public class TestNodeDecommissionManager {
         dns.get(2).getIpAddress()), 100);
     assertEquals(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
         nodeManager.getNodeStatus(dns.get(1)).getOperationalState());
+    assertNotEquals(0, nodeManager.getNodeStatus(
+        dns.get(1)).getOpStateExpiryEpochSeconds());
     assertEquals(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
         nodeManager.getNodeStatus(dns.get(2)).getOperationalState());
+    assertNotEquals(0, nodeManager.getNodeStatus(
+        dns.get(2)).getOpStateExpiryEpochSeconds());
 
     // Running the command again gives no error - nodes already decommissioning
     // are silently ignored.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 0917fa4..b78a76f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
@@ -275,6 +276,35 @@ public class TestSCMNodeManager {
   }
 
   /**
+   * Ensure that a change to the operationalState of a node fires a datanode
+   * event of type SetNodeOperationalStateCommand.
+   */
+  @Test
+  @Ignore // TODO - this test is no longer valid as the heartbeat processing
+          //        now generates the command message.
+  public void testSetNodeOpStateAndCommandFired()
+      throws IOException, NodeNotFoundException, AuthenticationException {
+    final int interval = 100;
+
+    OzoneConfiguration conf = getConf();
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval,
+        MILLISECONDS);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      DatanodeDetails dn = TestUtils.createRandomDatanodeAndRegister(
+          nodeManager);
+      long expiry = System.currentTimeMillis() / 1000 + 1000;
+      nodeManager.setNodeOperationalState(dn,
+          HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, expiry);
+      List<SCMCommand> commands = nodeManager.getCommandQueue(dn.getUuid());
+
+      Assert.assertTrue(commands.get(0).getClass().equals(
+          SetNodeOperationalStateCommand.class));
+      assertEquals(1, commands.size());
+    }
+  }
+
+  /**
    * Asserts that a single node moves from Healthy to stale node, then from
    * stale node to dead node if it misses enough heartbeats.
    *
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
index 482f444..ad7139f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
@@ -82,11 +82,12 @@ public class TestNodeStateMap {
 
     NodeStatus expectedStatus = new NodeStatus(
         NodeOperationalState.DECOMMISSIONING,
-        NodeState.HEALTHY);
+        NodeState.HEALTHY, 999);
     NodeStatus returnedStatus = map.updateNodeOperationalState(
-        dn.getUuid(), expectedStatus.getOperationalState());
+        dn.getUuid(), expectedStatus.getOperationalState(), 999);
     assertEquals(expectedStatus, returnedStatus);
     assertEquals(returnedStatus, map.getNodeStatus(dn.getUuid()));
+    assertEquals(999, returnedStatus.getOpStateExpiryEpochSeconds());
   }
 
   @Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index ee96565..9654715 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -191,9 +191,22 @@ public class ReplicationNodeManagerMock implements NodeManager {
   @Override
   public void setNodeOperationalState(DatanodeDetails dd,
       HddsProtos.NodeOperationalState newState) throws NodeNotFoundException {
+    setNodeOperationalState(dd, newState, 0);
+  }
+
+  /**
+   * Set the operation state of a node.
+   * @param dd The datanode to set the new state for
+   * @param newState The new operational state for the node
+   */
+  @Override
+  public void setNodeOperationalState(DatanodeDetails dd,
+      HddsProtos.NodeOperationalState newState, long opStateExpiryEpocSec)
+      throws NodeNotFoundException {
     NodeStatus currentStatus = nodeStateMap.get(dd);
     if (currentStatus != null) {
-      nodeStateMap.put(dd, new NodeStatus(newState, currentStatus.getHealth()));
+      nodeStateMap.put(dd, new NodeStatus(newState, currentStatus.getHealth(),
+          opStateExpiryEpocSec));
     } else {
       throw new NodeNotFoundException();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org