You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2020/03/31 21:58:49 UTC
[hadoop-ozone] branch HDDS-1880-Decom updated: HDDS-2592. Add
Datanode command to allow the datanode to persist its admin state (#521)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-1880-Decom
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-1880-Decom by this push:
new d902586 HDDS-2592. Add Datanode command to allow the datanode to persist its admin state (#521)
d902586 is described below
commit d902586b7ca39083ba66ea19778b537212bc42d3
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Tue Mar 31 22:58:40 2020 +0100
HDDS-2592. Add Datanode command to allow the datanode to persist its admin state (#521)
---
.../hadoop/hdds/protocol/DatanodeDetails.java | 98 ++++++++++++-
hadoop-hdds/common/src/main/proto/hdds.proto | 2 +
.../container/common/helpers/DatanodeIdYaml.java | 35 +++++
.../common/statemachine/DatanodeStateMachine.java | 2 +
.../SetNodeOperationalStateCommandHandler.java | 157 +++++++++++++++++++++
.../states/endpoint/HeartbeatEndpointTask.java | 12 ++
.../commands/SetNodeOperationalStateCommand.java | 89 ++++++++++++
.../proto/StorageContainerDatanodeProtocol.proto | 8 ++
.../hdds/scm/node/DatanodeAdminMonitorImpl.java | 3 +-
.../hdds/scm/node/DatanodeAdminNodeDetails.java | 8 ++
.../hdds/scm/node/NodeDecommissionManager.java | 8 +-
.../apache/hadoop/hdds/scm/node/NodeManager.java | 12 ++
.../hadoop/hdds/scm/node/NodeStateManager.java | 26 +++-
.../apache/hadoop/hdds/scm/node/NodeStatus.java | 32 ++++-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 81 ++++++++++-
.../hadoop/hdds/scm/node/states/NodeStateMap.java | 5 +-
.../hdds/scm/server/SCMDatanodeProtocolServer.java | 8 ++
.../hadoop/hdds/scm/container/MockNodeManager.java | 10 ++
.../hdds/scm/container/SimpleMockNodeManager.java | 10 +-
.../hdds/scm/node/TestNodeDecommissionManager.java | 5 +
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 30 ++++
.../hdds/scm/node/states/TestNodeStateMap.java | 5 +-
.../testutils/ReplicationNodeManagerMock.java | 15 +-
23 files changed, 643 insertions(+), 18 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 698a443..a2584d8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -49,6 +49,8 @@ public class DatanodeDetails extends NodeImpl implements
private String hostName;
private List<Port> ports;
private String certSerialId;
+ private HddsProtos.NodeOperationalState persistedOpState;
+ private long persistedOpStateExpiryEpochSec = 0;
/**
* Constructs DatanodeDetails instance. DatanodeDetails.Builder is used
@@ -59,15 +61,23 @@ public class DatanodeDetails extends NodeImpl implements
* @param networkLocation DataNode's network location path
* @param ports Ports used by the DataNode
* @param certSerialId serial id from SCM issued certificate.
+ * @param persistedOpState Operational State stored on DN.
+ * @param persistedOpStateExpiryEpochSec Seconds after the epoch the stored
+ * state should expire.
*/
+ @SuppressWarnings("checkstyle:ParameterNumber")
private DatanodeDetails(String uuid, String ipAddress, String hostName,
- String networkLocation, List<Port> ports, String certSerialId) {
+ String networkLocation, List<Port> ports, String certSerialId,
+ HddsProtos.NodeOperationalState persistedOpState,
+ long persistedOpStateExpiryEpochSec) {
super(hostName, networkLocation, NetConstants.NODE_COST_DEFAULT);
this.uuid = UUID.fromString(uuid);
this.ipAddress = ipAddress;
this.hostName = hostName;
this.ports = ports;
this.certSerialId = certSerialId;
+ this.persistedOpState = persistedOpState;
+ this.persistedOpStateExpiryEpochSec = persistedOpStateExpiryEpochSec;
}
protected DatanodeDetails(DatanodeDetails datanodeDetails) {
@@ -78,6 +88,9 @@ public class DatanodeDetails extends NodeImpl implements
this.hostName = datanodeDetails.hostName;
this.ports = datanodeDetails.ports;
this.setNetworkName(datanodeDetails.getNetworkName());
+ this.persistedOpState = datanodeDetails.getPersistedOpState();
+ this.persistedOpStateExpiryEpochSec =
+ datanodeDetails.getPersistedOpStateExpiryEpochSec();
}
/**
@@ -156,6 +169,46 @@ public class DatanodeDetails extends NodeImpl implements
}
/**
+ * Return the persistedOpState. If the stored value is null, return the
+ * default value of IN_SERVICE.
+ *
+ * @return The OperationalState persisted on the datanode.
+ */
+ public HddsProtos.NodeOperationalState getPersistedOpState() {
+ if (persistedOpState == null) {
+ return HddsProtos.NodeOperationalState.IN_SERVICE;
+ } else {
+ return persistedOpState;
+ }
+ }
+
+ /**
+ * Set the persistedOpState for this instance.
+ *
+ * @param state The new operational state.
+ */
+ public void setPersistedOpState(HddsProtos.NodeOperationalState state) {
+ this.persistedOpState = state;
+ }
+
+ /**
+ * Get the persistedOpStateExpiryEpochSec for the instance.
+ * @return Seconds from the epoch when the operational state should expire.
+ */
+ public long getPersistedOpStateExpiryEpochSec() {
+ return persistedOpStateExpiryEpochSec;
+ }
+
+ /**
+ * Set persistedOpStateExpiryEpochSec.
+ * @param expiry The number of second after the epoch the operational state
+ * should expire.
+ */
+ public void setPersistedOpStateExpiryEpochSec(long expiry) {
+ this.persistedOpStateExpiryEpochSec = expiry;
+ }
+
+ /**
* Given the name returns port number, null if the asked port is not found.
*
* @param name Name of the port
@@ -200,6 +253,13 @@ public class DatanodeDetails extends NodeImpl implements
if (datanodeDetailsProto.hasNetworkLocation()) {
builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
}
+ if (datanodeDetailsProto.hasPersistedOpState()) {
+ builder.setPersistedOpState(datanodeDetailsProto.getPersistedOpState());
+ }
+ if (datanodeDetailsProto.hasPersistedOpStateExpiry()) {
+ builder.setPersistedOpStateExpiry(
+ datanodeDetailsProto.getPersistedOpStateExpiry());
+ }
return builder.build();
}
@@ -226,6 +286,10 @@ public class DatanodeDetails extends NodeImpl implements
if (!Strings.isNullOrEmpty(getNetworkLocation())) {
builder.setNetworkLocation(getNetworkLocation());
}
+ if (persistedOpState != null) {
+ builder.setPersistedOpState(persistedOpState);
+ }
+ builder.setPersistedOpStateExpiry(persistedOpStateExpiryEpochSec);
for (Port port : ports) {
builder.addPorts(HddsProtos.Port.newBuilder()
@@ -246,6 +310,8 @@ public class DatanodeDetails extends NodeImpl implements
", networkLocation: " +
getNetworkLocation() +
", certSerialId: " + certSerialId +
+ ", persistedOpState: " + persistedOpState +
+ ", persistedOpStateExpiryEpochSec: " + persistedOpStateExpiryEpochSec +
"}";
}
@@ -285,6 +351,8 @@ public class DatanodeDetails extends NodeImpl implements
private String networkLocation;
private List<Port> ports;
private String certSerialId;
+ private HddsProtos.NodeOperationalState persistedOpState;
+ private long persistedOpStateExpiryEpochSec = 0;
/**
* Default private constructor. To create Builder instance use
@@ -374,6 +442,31 @@ public class DatanodeDetails extends NodeImpl implements
}
/**
+ * Adds persistedOpState.
+ *
+ * @param state The operational state persisted on the datanode
+ *
+ * @return DatanodeDetails.Builder
+ */
+ public Builder setPersistedOpState(HddsProtos.NodeOperationalState state) {
+ this.persistedOpState = state;
+ return this;
+ }
+
+ /**
+ * Adds persistedOpStateExpiryEpochSec.
+ *
+ * @param expiry The seconds after the epoch the operational state should
+ * expire.
+ *
+ * @return DatanodeDetails.Builder
+ */
+ public Builder setPersistedOpStateExpiry(long expiry) {
+ this.persistedOpStateExpiryEpochSec = expiry;
+ return this;
+ }
+
+ /**
* Builds and returns DatanodeDetails instance.
*
* @return DatanodeDetails
@@ -384,7 +477,8 @@ public class DatanodeDetails extends NodeImpl implements
networkLocation = NetConstants.DEFAULT_RACK;
}
DatanodeDetails dn = new DatanodeDetails(id, ipAddress, hostName,
- networkLocation, ports, certSerialId);
+ networkLocation, ports, certSerialId, persistedOpState,
+ persistedOpStateExpiryEpochSec);
if (networkName != null) {
dn.setNetworkName(networkName);
}
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index ad45d30..f629560 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -37,6 +37,8 @@ message DatanodeDetailsProto {
// network name, can be Ip address or host name, depends
optional string networkName = 6;
optional string networkLocation = 7; // Network topology location
+ optional NodeOperationalState persistedOpState = 8; // The Operational state persisted in the datanode.id file
+ optional int64 persistedOpStateExpiry = 9; // The seconds after the epoch when the OpState should expire
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java
index d3efa98..d016569 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.Yaml;
@@ -82,6 +83,12 @@ public final class DatanodeIdYaml {
.setIpAddress(datanodeDetailsYaml.getIpAddress())
.setHostName(datanodeDetailsYaml.getHostName())
.setCertSerialId(datanodeDetailsYaml.getCertSerialId());
+ if (datanodeDetailsYaml.getPersistedOpState() != null) {
+ builder.setPersistedOpState(HddsProtos.NodeOperationalState.valueOf(
+ datanodeDetailsYaml.getPersistedOpState()));
+ }
+ builder.setPersistedOpStateExpiry(
+ datanodeDetailsYaml.getPersistedOpStateExpiryEpochSec());
if (!MapUtils.isEmpty(datanodeDetailsYaml.getPortDetails())) {
for (Map.Entry<String, Integer> portEntry :
@@ -105,6 +112,8 @@ public final class DatanodeIdYaml {
private String ipAddress;
private String hostName;
private String certSerialId;
+ private String persistedOpState;
+ private long persistedOpStateExpiryEpochSec = 0;
private Map<String, Integer> portDetails;
public DatanodeDetailsYaml() {
@@ -113,11 +122,15 @@ public final class DatanodeIdYaml {
private DatanodeDetailsYaml(String uuid, String ipAddress,
String hostName, String certSerialId,
+ String persistedOpState,
+ long persistedOpStateExpiryEpochSec,
Map<String, Integer> portDetails) {
this.uuid = uuid;
this.ipAddress = ipAddress;
this.hostName = hostName;
this.certSerialId = certSerialId;
+ this.persistedOpState = persistedOpState;
+ this.persistedOpStateExpiryEpochSec = persistedOpStateExpiryEpochSec;
this.portDetails = portDetails;
}
@@ -137,6 +150,14 @@ public final class DatanodeIdYaml {
return certSerialId;
}
+ public String getPersistedOpState() {
+ return persistedOpState;
+ }
+
+ public long getPersistedOpStateExpiryEpochSec() {
+ return persistedOpStateExpiryEpochSec;
+ }
+
public Map<String, Integer> getPortDetails() {
return portDetails;
}
@@ -157,6 +178,14 @@ public final class DatanodeIdYaml {
this.certSerialId = certSerialId;
}
+ public void setPersistedOpState(String persistedOpState) {
+ this.persistedOpState = persistedOpState;
+ }
+
+ public void setPersistedOpStateExpiryEpochSec(long opStateExpiryEpochSec) {
+ this.persistedOpStateExpiryEpochSec = opStateExpiryEpochSec;
+ }
+
public void setPortDetails(Map<String, Integer> portDetails) {
this.portDetails = portDetails;
}
@@ -172,11 +201,17 @@ public final class DatanodeIdYaml {
}
}
+ String persistedOpString = null;
+ if (datanodeDetails.getPersistedOpState() != null) {
+ persistedOpString = datanodeDetails.getPersistedOpState().name();
+ }
return new DatanodeDetailsYaml(
datanodeDetails.getUuid().toString(),
datanodeDetails.getIpAddress(),
datanodeDetails.getHostName(),
datanodeDetails.getCertSerialId(),
+ persistedOpString,
+ datanodeDetails.getPersistedOpStateExpiryEpochSec(),
portDetails);
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 5229ae8..6835ef6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.DeleteContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.ReplicateContainerCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
@@ -138,6 +139,7 @@ public class DatanodeStateMachine implements Closeable {
dnConf.getContainerDeleteThreads()))
.addHandler(new ClosePipelineCommandHandler())
.addHandler(new CreatePipelineCommandHandler(conf))
+ .addHandler(new SetNodeOperationalStateCommandHandler(conf))
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
new file mode 100644
index 0000000..1fcf5a2
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Handle the SetNodeOperationalStateCommand sent from SCM to the datanode
+ * to persist the current operational state.
+ */
+public class SetNodeOperationalStateCommandHandler implements CommandHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SetNodeOperationalStateCommandHandler.class);
+ private final Configuration conf;
+ private final AtomicInteger invocationCount = new AtomicInteger(0);
+ private final AtomicLong totalTime = new AtomicLong(0);
+
+ /**
+ * Set Node State command handler.
+ *
+ * @param conf - Configuration for the datanode.
+ */
+ public SetNodeOperationalStateCommandHandler(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Handles a given SCM command.
+ *
+ * @param command - SCM Command
+ * @param container - Ozone Container.
+ * @param context - Current Context.
+ * @param connectionManager - The SCMs that we are talking to.
+ */
+ @Override
+ public void handle(SCMCommand command, OzoneContainer container,
+ StateContext context, SCMConnectionManager connectionManager) {
+ long startTime = Time.monotonicNow();
+ invocationCount.incrementAndGet();
+ StorageContainerDatanodeProtocolProtos.SetNodeOperationalStateCommandProto
+ setNodeCmdProto = null;
+
+ if (command.getType() != Type.setNodeOperationalStateCommand) {
+ LOG.warn("Skipping handling command, expected command "
+ + "type {} but found {}",
+ Type.setNodeOperationalStateCommand, command.getType());
+ return;
+ }
+ SetNodeOperationalStateCommand setNodeCmd =
+ (SetNodeOperationalStateCommand) command;
+ setNodeCmdProto = setNodeCmd.getProto();
+ DatanodeDetails dni = context.getParent().getDatanodeDetails();
+ dni.setPersistedOpState(setNodeCmdProto.getNodeOperationalState());
+ dni.setPersistedOpStateExpiryEpochSec(
+ setNodeCmd.getStateExpiryEpochSeconds());
+ try {
+ persistDatanodeDetails(dni);
+ } catch (IOException ioe) {
+ LOG.error("Failed to persist the datanode state", ioe);
+ // TODO - this should probably be raised, but it will break the command
+ // handler interface.
+ }
+ totalTime.addAndGet(Time.monotonicNow() - startTime);
+ }
+
+ // TODO - this duplicates code in HddsDatanodeService and InitDatanodeState
+ // Need to refactor.
+ private void persistDatanodeDetails(DatanodeDetails dnDetails)
+ throws IOException {
+ String idFilePath = HddsServerUtil.getDatanodeIdFilePath(conf);
+ if (idFilePath == null || idFilePath.isEmpty()) {
+ LOG.error("A valid path is needed for config setting {}",
+ ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR);
+ throw new IllegalArgumentException(
+ ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR +
+ " must be defined. See" +
+ " https://wiki.apache.org/hadoop/Ozone#Configuration" +
+ " for details on configuring Ozone.");
+ }
+
+ Preconditions.checkNotNull(idFilePath);
+ File idFile = new File(idFilePath);
+ ContainerUtils.writeDatanodeDetailsTo(dnDetails, idFile);
+ }
+
+ /**
+ * Returns the command type that this command handler handles.
+ *
+ * @return Type
+ */
+ @Override
+ public StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+ getCommandType() {
+ return Type.setNodeOperationalStateCommand;
+ }
+
+ /**
+ * Returns number of times this handler has been invoked.
+ *
+ * @return int
+ */
+ @Override
+ public int getInvocationCount() {
+ return invocationCount.intValue();
+ }
+
+ /**
+ * Returns the average time this function takes to run.
+ *
+ * @return long
+ */
+ @Override
+ public long getAverageRunTime() {
+ final int invocations = invocationCount.get();
+ return invocations == 0 ?
+ 0 : totalTime.get() / invocations;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index a55d0d6..fa66a57 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -331,6 +332,17 @@ public class HeartbeatEndpointTask
}
this.context.addCommand(closePipelineCommand);
break;
+ case setNodeOperationalStateCommand:
+ SetNodeOperationalStateCommand setNodeOperationalStateCommand =
+ SetNodeOperationalStateCommand.getFromProtobuf(
+ commandResponseProto.getSetNodeOperationalStateCommandProto());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received SCM set operational state command. State: {} " +
+ "Expiry: {}", setNodeOperationalStateCommand.getOpState(),
+ setNodeOperationalStateCommand.getStateExpiryEpochSeconds());
+ }
+ this.context.addCommand(setNodeOperationalStateCommand);
+ break;
default:
throw new IllegalArgumentException("Unknown response : "
+ commandResponseProto.getCommandType().name());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeOperationalStateCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeOperationalStateCommand.java
new file mode 100644
index 0000000..3ff7949
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeOperationalStateCommand.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SetNodeOperationalStateCommandProto;
+
+/**
+ * A command used to persist the current node operational state on the datanode.
+ */
+public class SetNodeOperationalStateCommand
+ extends SCMCommand<SetNodeOperationalStateCommandProto> {
+
+ private final HddsProtos.NodeOperationalState opState;
+ private long stateExpiryEpochSeconds;
+
+ /**
+ * Ctor that creates a SetNodeOperationalStateCommand.
+ *
+ * @param id - Command ID. Something a time stamp would suffice.
+ * @param state - OperationalState that want the node to be set into.
+ * @param stateExpiryEpochSeconds The epoch time when the state should
+ * expire, or zero for the state to remain
+ * indefinitely.
+ */
+ public SetNodeOperationalStateCommand(long id,
+ HddsProtos.NodeOperationalState state, long stateExpiryEpochSeconds) {
+ super(id);
+ this.opState = state;
+ this.stateExpiryEpochSeconds = stateExpiryEpochSeconds;
+ }
+
+ /**
+ * Returns the type of this command.
+ *
+ * @return Type - This is setNodeOperationalStateCommand.
+ */
+ @Override
+ public SCMCommandProto.Type getType() {
+ return SCMCommandProto.Type.setNodeOperationalStateCommand;
+ }
+
+ /**
+ * Gets the protobuf message of this object.
+ *
+ * @return A protobuf message.
+ */
+ @Override
+ public SetNodeOperationalStateCommandProto getProto() {
+ return SetNodeOperationalStateCommandProto.newBuilder()
+ .setCmdId(getId())
+ .setNodeOperationalState(opState)
+ .setStateExpiryEpochSeconds(stateExpiryEpochSeconds).build();
+ }
+
+ public HddsProtos.NodeOperationalState getOpState() {
+ return opState;
+ }
+
+ public long getStateExpiryEpochSeconds() {
+ return stateExpiryEpochSeconds;
+ }
+
+ public static SetNodeOperationalStateCommand getFromProtobuf(
+ SetNodeOperationalStateCommandProto cmdProto) {
+ Preconditions.checkNotNull(cmdProto);
+ return new SetNodeOperationalStateCommand(cmdProto.getCmdId(),
+ cmdProto.getNodeOperationalState(),
+ cmdProto.getStateExpiryEpochSeconds());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 171ea86..4a977e1 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -287,6 +287,7 @@ message SCMCommandProto {
replicateContainerCommand = 5;
createPipelineCommand = 6;
closePipelineCommand = 7;
+ setNodeOperationalStateCommand = 8;
}
// TODO: once we start using protoc 3.x, refactor this message using "oneof"
required Type commandType = 1;
@@ -297,6 +298,7 @@ message SCMCommandProto {
optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
optional CreatePipelineCommandProto createPipelineCommandProto = 7;
optional ClosePipelineCommandProto closePipelineCommandProto = 8;
+ optional SetNodeOperationalStateCommandProto setNodeOperationalStateCommandProto = 9;
}
/**
@@ -384,6 +386,12 @@ message ClosePipelineCommandProto {
required int64 cmdId = 2;
}
+message SetNodeOperationalStateCommandProto {
+ required int64 cmdId = 1;
+ required NodeOperationalState nodeOperationalState = 2;
+ required int64 stateExpiryEpochSeconds = 3;
+}
+
/**
* Protocol used from a datanode to StorageContainerManager.
*
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
index 4d2d895..a1d47fd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
@@ -357,7 +357,8 @@ public class DatanodeAdminMonitorImpl implements DatanodeAdminMonitor {
private void setNodeOpState(DatanodeAdminNodeDetails dn,
HddsProtos.NodeOperationalState state) throws NodeNotFoundException {
- nodeManager.setNodeOperationalState(dn.getDatanodeDetails(), state);
+ nodeManager.setNodeOperationalState(dn.getDatanodeDetails(), state,
+ dn.getMaintenanceEnd() / 1000);
}
private NodeStatus getNodeStatus(DatanodeDetails dnd)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java
index c23fcd2..9c8a905 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java
@@ -107,6 +107,14 @@ public class DatanodeAdminNodeDetails {
}
/**
+ * Returns the maintenance end time as milli seconds from the epoch.
+ * @return The maintenance end time, or zero if no end time is set.
+ */
+ public long getMaintenanceEnd() {
+ return maintenanceEndTime;
+ }
+
+ /**
* Matches only on the DatanodeDetails field, which compares only the UUID
* of the node to determine of they are the same object or not.
*
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
index 06fe270..3258fef 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -314,9 +314,15 @@ public class NodeDecommissionManager {
throws NodeNotFoundException, InvalidNodeStateException {
NodeStatus nodeStatus = getNodeStatus(dn);
NodeOperationalState opState = nodeStatus.getOperationalState();
+
+ long maintenanceEnd = 0;
+ if (endInHours != 0) {
+ maintenanceEnd =
+ (System.currentTimeMillis() / 1000L) + (endInHours * 60L * 60L);
+ }
if (opState == NodeOperationalState.IN_SERVICE) {
nodeManager.setNodeOperationalState(
- dn, NodeOperationalState.ENTERING_MAINTENANCE);
+ dn, NodeOperationalState.ENTERING_MAINTENANCE, maintenanceEnd);
monitor.startMonitoring(dn, endInHours);
LOG.info("Starting Maintenance for node {}", dn);
} else if (nodeStatus.isMaintenance()) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index b595d00..44ab581 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -142,6 +142,18 @@ public interface NodeManager extends StorageContainerNodeProtocol,
NodeOperationalState newState) throws NodeNotFoundException;
/**
+ * Set the operation state of a node.
+ * @param datanodeDetails The datanode to set the new state for
+ * @param newState The new operational state for the node
+ * @param opStateExpiryEpocSec Seconds from the epoch when the operational
+ * state should end. Zero indicates the state
+ * never end.
+ */
+ void setNodeOperationalState(DatanodeDetails datanodeDetails,
+ NodeOperationalState newState,
+ long opStateExpiryEpocSec) throws NodeNotFoundException;
+
+ /**
* Get set of pipelines a datanode is part of.
* @param datanodeDetails DatanodeDetails
* @return Set of PipelineID
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index b16e79c..f18ac60 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -358,10 +358,28 @@ public class NodeStateManager implements Runnable, Closeable {
*/
public void setNodeOperationalState(DatanodeDetails dn,
NodeOperationalState newState) throws NodeNotFoundException {
+ setNodeOperationalState(dn, newState, 0);
+ }
+
+ /**
+ * Sets the operational state of the given node. Intended to be called when
+ * a node is being decommissioned etc.
+ *
+ * @param dn The datanode having its state set
+ * @param newState The new operational State of the node.
+ * @param stateExpiryEpochSec The number of seconds from the epoch when the
+ * operational state should expire. Passing zero
+ * indicates the state will never expire
+ */
+ public void setNodeOperationalState(DatanodeDetails dn,
+ NodeOperationalState newState,
+ long stateExpiryEpochSec) throws NodeNotFoundException {
DatanodeInfo dni = nodeStateMap.getNodeInfo(dn.getUuid());
NodeStatus oldStatus = dni.getNodeStatus();
- if (oldStatus.getOperationalState() != newState) {
- nodeStateMap.updateNodeOperationalState(dn.getUuid(), newState);
+ if (oldStatus.getOperationalState() != newState ||
+ oldStatus.getOpStateExpiryEpochSeconds() != stateExpiryEpochSec) {
+ nodeStateMap.updateNodeOperationalState(
+ dn.getUuid(), newState, stateExpiryEpochSec);
// This will trigger an event based on the nodes health when the
// operational state changes. Eg a node that was IN_MAINTENANCE goes
// to IN_SERVICE + HEALTHY. This will trigger the HEALTHY node event to
@@ -370,7 +388,9 @@ public class NodeStateManager implements Runnable, Closeable {
// container replicas. Sometimes the event will do nothing, but it will
// not do any harm either. Eg DECOMMISSIONING -> DECOMMISSIONED + HEALTHY
// but the pipeline creation logic will ignore decommissioning nodes.
- fireHealthStateEvent(oldStatus.getHealth(), dn);
+ if (oldStatus.getOperationalState() != newState) {
+ fireHealthStateEvent(oldStatus.getHealth(), dn);
+ }
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
index b8a92e5..72ca015 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
@@ -25,17 +25,28 @@ import java.util.Objects;
/**
* This class is used to capture the current status of a datanode. This
* includes its health (healthy, stale or dead) and its operation status (
- * in_service, decommissioned and maintenance mode.
+ * in_service, decommissioned and maintenance mode) along with the expiry time
+ * for the operational state (used with maintenance mode).
*/
public class NodeStatus {
private HddsProtos.NodeOperationalState operationalState;
private HddsProtos.NodeState health;
+ private long opStateExpiryEpochSeconds;
public NodeStatus(HddsProtos.NodeOperationalState operationalState,
HddsProtos.NodeState health) {
this.operationalState = operationalState;
this.health = health;
+ this.opStateExpiryEpochSeconds = 0;
+ }
+
+ public NodeStatus(HddsProtos.NodeOperationalState operationalState,
+ HddsProtos.NodeState health,
+ long opStateExpireEpocSeconds) {
+ this.operationalState = operationalState;
+ this.health = health;
+ this.opStateExpiryEpochSeconds = opStateExpireEpocSeconds;
}
public static NodeStatus inServiceHealthy() {
@@ -61,6 +72,17 @@ public class NodeStatus {
return operationalState;
}
+ public long getOpStateExpiryEpochSeconds() {
+ return opStateExpiryEpochSeconds;
+ }
+
+ public boolean operationalStateExpired() {
+ if (0 == opStateExpiryEpochSeconds) {
+ return false;
+ }
+ return System.currentTimeMillis() / 1000 >= opStateExpiryEpochSeconds;
+ }
+
/**
* Returns true if the nodeStatus indicates the node is in any decommission
* state.
@@ -163,7 +185,8 @@ public class NodeStatus {
}
NodeStatus other = (NodeStatus) obj;
if (this.operationalState == other.operationalState &&
- this.health == other.health) {
+ this.health == other.health
+ && this.opStateExpiryEpochSeconds == other.opStateExpiryEpochSeconds) {
return true;
}
return false;
@@ -171,12 +194,13 @@ public class NodeStatus {
@Override
public int hashCode() {
- return Objects.hash(health, operationalState);
+ return Objects.hash(health, operationalState, opStateExpiryEpochSeconds);
}
@Override
public String toString() {
- return "OperationalState: "+operationalState+" Health: "+health;
+ return "OperationalState: "+operationalState+" Health: "+health+
+ " OperastionStateExpiry: "+opStateExpiryEpochSeconds;
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index d89dac1..3f17e32 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
@@ -64,11 +65,13 @@ import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -231,7 +234,23 @@ public class SCMNodeManager implements NodeManager {
@Override
public void setNodeOperationalState(DatanodeDetails datanodeDetails,
NodeOperationalState newState) throws NodeNotFoundException{
- nodeStateManager.setNodeOperationalState(datanodeDetails, newState);
+ setNodeOperationalState(datanodeDetails, newState, 0);
+ }
+
+ /**
+ * Set the operation state of a node.
+ * @param datanodeDetails The datanode to set the new state for
+ * @param newState The new operational state for the node
+ * @param opStateExpiryEpocSec Seconds from the epoch when the operational
+ * state should end. Zero indicates the state
+ * never end.
+ */
+ @Override
+ public void setNodeOperationalState(DatanodeDetails datanodeDetails,
+ NodeOperationalState newState, long opStateExpiryEpocSec)
+ throws NodeNotFoundException{
+ nodeStateManager.setNodeOperationalState(
+ datanodeDetails, newState, opStateExpiryEpocSec);
}
/**
@@ -312,6 +331,7 @@ public class SCMNodeManager implements NodeManager {
datanodeDetails.toString());
}
}
+ registerInitialDatanodeOpState(datanodeDetails);
return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success)
.setDatanode(datanodeDetails)
@@ -320,6 +340,29 @@ public class SCMNodeManager implements NodeManager {
}
/**
+ * When a node registers with SCM, the operational state stored on the
+ * datanode is the source of truth. Therefore, if the datanode reports
+ * anything other than IN_SERVICE on registration, the state in SCM should be
+ * updated to reflect the datanode state.
+ * @param dn
+ */
+ private void registerInitialDatanodeOpState(DatanodeDetails dn) {
+ try {
+ HddsProtos.NodeOperationalState dnOpState = dn.getPersistedOpState();
+ if (dnOpState != NodeOperationalState.IN_SERVICE) {
+ LOG.info("Updating nodeOperationalState on registration as the " +
+ "datanode has a persisted state of {} and expiry of {}",
+ dnOpState, dn.getPersistedOpStateExpiryEpochSec());
+ setNodeOperationalState(dn, dnOpState,
+ dn.getPersistedOpStateExpiryEpochSec());
+ }
+ } catch (NodeNotFoundException e) {
+ LOG.error("Unable to find the node when setting the operational state",
+ e);
+ }
+ }
+
+ /**
* Add an entry to the dnsToUuidMap, which maps hostname / IP to the DNs
* running on that host. As each address can have many DNs running on it,
* this is a one to many mapping.
@@ -352,6 +395,7 @@ public class SCMNodeManager implements NodeManager {
try {
nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
metrics.incNumHBProcessed();
+ updateDatanodeOpState(datanodeDetails);
} catch (NodeNotFoundException e) {
metrics.incNumHBProcessingFailed();
LOG.error("SCM trying to process heartbeat from an " +
@@ -360,6 +404,41 @@ public class SCMNodeManager implements NodeManager {
return commandQueue.getCommand(datanodeDetails.getUuid());
}
+ /**
+ * If the operational state or expiry reported in the datanode heartbeat do
+ * not match those store in SCM, queue a command to update the state persisted
+ * on the datanode. Additionally, ensure the datanodeDetails stored in SCM
+ * match those reported in the heartbeat.
+ * This method should only be called when processing the
+ * heartbeat, and for a registered node, the information stored in SCM is the
+ * source of truth.
+ * @param reportedDn The DatanodeDetails taken from the node heartbeat.
+ * @throws NodeNotFoundException
+ */
+ private void updateDatanodeOpState(DatanodeDetails reportedDn)
+ throws NodeNotFoundException {
+ NodeStatus scmStatus = getNodeStatus(reportedDn);
+ if (scmStatus.getOperationalState() != reportedDn.getPersistedOpState()
+ || scmStatus.getOpStateExpiryEpochSeconds()
+ != reportedDn.getPersistedOpStateExpiryEpochSec()) {
+ LOG.info("Scheduling a command to update the operationalState " +
+ "persisted on the datanode as the reported value ({}, {}) does not " +
+ "match the value stored in SCM ({}, {})",
+ reportedDn.getPersistedOpState(),
+ reportedDn.getPersistedOpStateExpiryEpochSec(),
+ scmStatus.getOperationalState(),
+ scmStatus.getOpStateExpiryEpochSeconds());
+ commandQueue.addCommand(reportedDn.getUuid(),
+ new SetNodeOperationalStateCommand(
+ Time.monotonicNow(), scmStatus.getOperationalState(),
+ scmStatus.getOpStateExpiryEpochSeconds()));
+ }
+ DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn);
+ scmDnd.setPersistedOpStateExpiryEpochSec(
+ reportedDn.getPersistedOpStateExpiryEpochSec());
+ scmDnd.setPersistedOpState(reportedDn.getPersistedOpState());
+ }
+
@Override
public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) {
try {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
index 6565e81..3cf232a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
@@ -113,13 +113,14 @@ public class NodeStateMap {
* @throws NodeNotFoundException if the node is not present
*/
public NodeStatus updateNodeOperationalState(UUID nodeId,
- NodeOperationalState newOpState) throws NodeNotFoundException {
+ NodeOperationalState newOpState, long opStateExpiryEpochSeconds)
+ throws NodeNotFoundException {
try {
lock.writeLock().lock();
DatanodeInfo dn = getNodeInfo(nodeId);
NodeStatus oldStatus = dn.getNodeStatus();
NodeStatus newStatus = new NodeStatus(
- newOpState, oldStatus.getHealth());
+ newOpState, oldStatus.getHealth(), opStateExpiryEpochSeconds);
dn.setNodeStatus(newStatus);
return newStatus;
} finally {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index d569044..98a25d2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
@@ -87,6 +88,7 @@ import static org.apache.hadoop.hdds.protocol.proto
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
.closePipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.setNodeOperationalStateCommand;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
@@ -350,6 +352,12 @@ public class SCMDatanodeProtocolServer implements
.setClosePipelineCommandProto(
((ClosePipelineCommand)cmd).getProto())
.build();
+ case setNodeOperationalStateCommand:
+ return builder
+ .setCommandType(setNodeOperationalStateCommand)
+ .setSetNodeOperationalStateCommandProto(
+ ((SetNodeOperationalStateCommand)cmd).getProto())
+ .build();
default:
throw new IllegalArgumentException("Scm command " +
cmd.getType().toString() + " is not implemented");
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index d24cbfd..da04b71 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -279,6 +279,16 @@ public class MockNodeManager implements NodeManager {
}
/**
+ * Set the operation state of a node.
+ * @param datanodeDetails The datanode to set the new state for
+ * @param newState The new operational state for the node
+ */
+ public void setNodeOperationalState(DatanodeDetails datanodeDetails,
+ HddsProtos.NodeOperationalState newState, long opStateExpiryEpocSec)
+ throws NodeNotFoundException {
+ }
+
+ /**
* Get set of pipelines a datanode is part of.
* @param dnId - datanodeID
* @return Set of PipelineID
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index dad3448..6982d03 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -100,12 +100,20 @@ public class SimpleMockNodeManager implements NodeManager {
@Override
public void setNodeOperationalState(DatanodeDetails dn,
HddsProtos.NodeOperationalState newState) throws NodeNotFoundException {
+ setNodeOperationalState(dn, newState, 0);
+ }
+
+ @Override
+ public void setNodeOperationalState(DatanodeDetails dn,
+ HddsProtos.NodeOperationalState newState, long opStateExpiryEpocSec)
+ throws NodeNotFoundException {
DatanodeInfo dni = nodeMap.get(dn.getUuid());
if (dni == null) {
throw new NodeNotFoundException();
}
dni.setNodeStatus(
- new NodeStatus(newState, dni.getNodeStatus().getHealth()));
+ new NodeStatus(
+ newState, dni.getNodeStatus().getHealth(), opStateExpiryEpocSec));
}
/**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
index df62438..a8bcf54 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
@@ -36,6 +36,7 @@ import java.util.Arrays;
import java.util.ArrayList;
import static junit.framework.TestCase.assertEquals;
import static org.assertj.core.api.Fail.fail;
+import static org.junit.Assert.assertNotEquals;
/**
* Unit tests for the decommision manager.
@@ -182,8 +183,12 @@ public class TestNodeDecommissionManager {
dns.get(2).getIpAddress()), 100);
assertEquals(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
nodeManager.getNodeStatus(dns.get(1)).getOperationalState());
+ assertNotEquals(0, nodeManager.getNodeStatus(
+ dns.get(1)).getOpStateExpiryEpochSeconds());
assertEquals(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE,
nodeManager.getNodeStatus(dns.get(2)).getOperationalState());
+ assertNotEquals(0, nodeManager.getNodeStatus(
+ dns.get(2)).getOpStateExpiryEpochSeconds());
// Running the command again gives no error - nodes already decommissioning
// are silently ignored.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 0917fa4..b78a76f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
@@ -275,6 +276,35 @@ public class TestSCMNodeManager {
}
/**
+ * Ensure that a change to the operationalState of a node fires a datanode
+ * event of type SetNodeOperationalStateCommand.
+ */
+ @Test
+ @Ignore // TODO - this test is no longer valid as the heartbeat processing
+ // now generates the command message.
+ public void testSetNodeOpStateAndCommandFired()
+ throws IOException, NodeNotFoundException, AuthenticationException {
+ final int interval = 100;
+
+ OzoneConfiguration conf = getConf();
+ conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval,
+ MILLISECONDS);
+
+ try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+ DatanodeDetails dn = TestUtils.createRandomDatanodeAndRegister(
+ nodeManager);
+ long expiry = System.currentTimeMillis() / 1000 + 1000;
+ nodeManager.setNodeOperationalState(dn,
+ HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, expiry);
+ List<SCMCommand> commands = nodeManager.getCommandQueue(dn.getUuid());
+
+ Assert.assertTrue(commands.get(0).getClass().equals(
+ SetNodeOperationalStateCommand.class));
+ assertEquals(1, commands.size());
+ }
+ }
+
+ /**
* Asserts that a single node moves from Healthy to stale node, then from
* stale node to dead node if it misses enough heartbeats.
*
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
index 482f444..ad7139f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
@@ -82,11 +82,12 @@ public class TestNodeStateMap {
NodeStatus expectedStatus = new NodeStatus(
NodeOperationalState.DECOMMISSIONING,
- NodeState.HEALTHY);
+ NodeState.HEALTHY, 999);
NodeStatus returnedStatus = map.updateNodeOperationalState(
- dn.getUuid(), expectedStatus.getOperationalState());
+ dn.getUuid(), expectedStatus.getOperationalState(), 999);
assertEquals(expectedStatus, returnedStatus);
assertEquals(returnedStatus, map.getNodeStatus(dn.getUuid()));
+ assertEquals(999, returnedStatus.getOpStateExpiryEpochSeconds());
}
@Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index ee96565..9654715 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -191,9 +191,22 @@ public class ReplicationNodeManagerMock implements NodeManager {
@Override
public void setNodeOperationalState(DatanodeDetails dd,
HddsProtos.NodeOperationalState newState) throws NodeNotFoundException {
+ setNodeOperationalState(dd, newState, 0);
+ }
+
+ /**
+ * Set the operation state of a node.
+ * @param dd The datanode to set the new state for
+ * @param newState The new operational state for the node
+ */
+ @Override
+ public void setNodeOperationalState(DatanodeDetails dd,
+ HddsProtos.NodeOperationalState newState, long opStateExpiryEpocSec)
+ throws NodeNotFoundException {
NodeStatus currentStatus = nodeStateMap.get(dd);
if (currentStatus != null) {
- nodeStateMap.put(dd, new NodeStatus(newState, currentStatus.getHealth()));
+ nodeStateMap.put(dd, new NodeStatus(newState, currentStatus.getHealth(),
+ opStateExpiryEpocSec));
} else {
throw new NodeNotFoundException();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org