You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2022/05/13 16:22:16 UTC
[hadoop] branch branch-2.10 updated: YARN-11125. Backport YARN-6483 to branch-2.10 (#4258)
This is an automated email from the ASF dual-hosted git repository.
aajisaka pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new ec5f3e4ed16 YARN-11125. Backport YARN-6483 to branch-2.10 (#4258)
ec5f3e4ed16 is described below
commit ec5f3e4ed16c4aedf06fcbf0eda28e7bcfe8d22e
Author: Ashutosh Gupta <as...@st.niituniversity.in>
AuthorDate: Fri May 13 17:21:37 2022 +0100
YARN-11125. Backport YARN-6483 to branch-2.10 (#4258)
Co-authored-by: Ashutosh Gupta <as...@amazon.com>
Signed-off-by: Akira Ajisaka <aa...@apache.org>
---
.../apache/hadoop/yarn/api/records/NodeReport.java | 47 +++++--
.../hadoop/yarn/api/records/NodeUpdateType.java} | 11 +-
.../src/main/proto/yarn_protos.proto | 8 ++
.../hadoop/yarn/client/ProtocolHATestBase.java | 50 ++++----
.../apache/hadoop/yarn/client/cli/TestYarnCLI.java | 40 +++---
.../yarn/api/records/impl/pb/NodeReportPBImpl.java | 50 +++++++-
.../yarn/api/records/impl/pb/ProtoUtils.java | 12 ++
.../hadoop/yarn/server/utils/BuilderUtils.java | 14 ++-
.../server/resourcemanager/ClientRMService.java | 3 +-
.../DecommissioningNodesWatcher.java | 38 +-----
.../resourcemanager/DefaultAMSProcessor.java | 12 +-
.../server/resourcemanager/NodesListManager.java | 84 +++++++++----
.../resourcemanager/NodesListManagerEventType.java | 3 +-
.../yarn/server/resourcemanager/rmapp/RMApp.java | 10 +-
.../server/resourcemanager/rmapp/RMAppImpl.java | 11 +-
.../rmapp/RMAppNodeUpdateEvent.java | 9 +-
.../yarn/server/resourcemanager/rmnode/RMNode.java | 2 +-
.../server/resourcemanager/rmnode/RMNodeImpl.java | 5 +
.../hadoop/yarn/server/resourcemanager/MockRM.java | 21 +++-
.../resourcemanager/TestClientRMService.java | 89 ++++++++++---
.../TestDecommissioningNodesWatcher.java | 11 +-
.../resourcemanager/TestRMNodeTransitions.java | 49 ++++----
.../TestResourceTrackerService.java | 139 ++++++++++++++++++---
.../applicationsmanager/MockAsm.java | 4 +-
.../TestAMRMRPCNodeUpdates.java | 107 +++++++++++-----
.../server/resourcemanager/rmapp/MockRMApp.java | 4 +-
26 files changed, 598 insertions(+), 235 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java
index 885a3b4b35a..3a80641bb6d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java
@@ -53,7 +53,8 @@ public abstract class NodeReport {
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime) {
return newInstance(nodeId, nodeState, httpAddress, rackName, used,
- capability, numContainers, healthReport, lastHealthReportTime, null);
+ capability, numContainers, healthReport, lastHealthReportTime,
+ null, null, null);
}
@Private
@@ -61,7 +62,8 @@ public abstract class NodeReport {
public static NodeReport newInstance(NodeId nodeId, NodeState nodeState,
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime,
- Set<String> nodeLabels) {
+ Set<String> nodeLabels, Integer decommissioningTimeout,
+ NodeUpdateType nodeUpdateType) {
NodeReport nodeReport = Records.newRecord(NodeReport.class);
nodeReport.setNodeId(nodeId);
nodeReport.setNodeState(nodeState);
@@ -73,6 +75,8 @@ public abstract class NodeReport {
nodeReport.setHealthReport(healthReport);
nodeReport.setLastHealthReportTime(lastHealthReportTime);
nodeReport.setNodeLabels(nodeLabels);
+ nodeReport.setDecommissioningTimeout(decommissioningTimeout);
+ nodeReport.setNodeUpdateType(nodeUpdateType);
return nodeReport;
}
@@ -186,8 +190,8 @@ public abstract class NodeReport {
public abstract void setLastHealthReportTime(long lastHealthReport);
/**
- * Get labels of this node
- * @return labels of this node
+ * Get labels of this node.
+ * @return labels of this node.
*/
@Public
@Stable
@@ -198,8 +202,8 @@ public abstract class NodeReport {
public abstract void setNodeLabels(Set<String> nodeLabels);
/**
- * Get containers aggregated resource utilization in a node
- * @return containers resource utilization
+ * Get containers aggregated resource utilization in a node.
+ * @return containers resource utilization.
*/
@Public
@Stable
@@ -217,8 +221,8 @@ public abstract class NodeReport {
}
/**
- * Get node resource utilization
- * @return node resource utilization
+ * Get node resource utilization.
+ * @return node resource utilization.
*/
@Public
@Stable
@@ -227,4 +231,31 @@ public abstract class NodeReport {
@Private
@Unstable
public abstract void setNodeUtilization(ResourceUtilization nodeUtilization);
+
+ /**
+ * Optional decommissioning timeout in seconds (null indicates absent
+ * timeout).
+ * @return the decommissioning timeout in second.
+ */
+ public Integer getDecommissioningTimeout() {
+ return null;
+ }
+
+ /**
+ * Set the decommissioning timeout in seconds (null indicates absent timeout).
+ * */
+ public void setDecommissioningTimeout(Integer decommissioningTimeout) {}
+
+ /**
+ * Optional node update type (null indicates absent update type).
+ * @return the node update.
+ */
+ public NodeUpdateType getNodeUpdateType() {
+ return NodeUpdateType.NODE_UNUSABLE;
+ }
+
+ /**
+ * Set the node update type (null indicates absent node update type).
+ * */
+ public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java
similarity index 78%
copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java
index 2afc8e6e4a7..9152a6a3d9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java
@@ -16,9 +16,14 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.resourcemanager;
+package org.apache.hadoop.yarn.api.records;
-public enum NodesListManagerEventType {
+/**
+ * <p>Taxonomy of the <code>NodeState</code> that a
+ * <code>Node</code> might transition into.</p>
+ * */
+public enum NodeUpdateType {
NODE_USABLE,
- NODE_UNUSABLE
+ NODE_UNUSABLE,
+ NODE_DECOMMISSIONING
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 214a2b812f0..5b2d51c5bfb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -325,6 +325,12 @@ message NodeIdProto {
optional int32 port = 2;
}
+enum NodeUpdateTypeProto {
+ NODE_USABLE = 0;
+ NODE_UNUSABLE = 1;
+ NODE_DECOMMISSIONING = 2;
+}
+
message NodeReportProto {
optional NodeIdProto nodeId = 1;
optional string httpAddress = 2;
@@ -338,6 +344,8 @@ message NodeReportProto {
repeated string node_labels = 10;
optional ResourceUtilizationProto containers_utilization = 11;
optional ResourceUtilizationProto node_utilization = 12;
+ optional uint32 decommissioning_timeout = 13;
+ optional NodeUpdateTypeProto node_update_type = 14;
}
message NodeIdToLabelsProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
index 9bfd606cfdf..c56fa1a8916 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
@@ -18,38 +18,30 @@
package org.apache.hadoop.yarn.client;
-import com.google.common.base.Supplier;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.CollectorInfo;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
+import com.google.common.base.Supplier;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@@ -78,14 +70,18 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
@@ -117,6 +113,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
@@ -130,8 +127,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSe
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
-import org.junit.Before;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
@@ -628,7 +628,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
}
public ApplicationReport createFakeAppReport() {
- ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+ ApplicationId appId = ApplicationId.newInstance(1000L, 1);
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
// create a fake application report
@@ -648,7 +648,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
}
public ApplicationId createFakeAppId() {
- return ApplicationId.newInstance(1000l, 1);
+ return ApplicationId.newInstance(1000L, 1);
}
public ApplicationAttemptId createFakeApplicationAttemptId() {
@@ -667,7 +667,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
NodeId nodeId = NodeId.newInstance("localhost", 0);
NodeReport report =
NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost",
- "rack1", null, null, 4, null, 1000l, null);
+ "rack1", null, null, 4, null, 1000L);
List<NodeReport> reports = new ArrayList<NodeReport>();
reports.add(report);
return reports;
@@ -691,8 +691,8 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
public ApplicationAttemptReport createFakeApplicationAttemptReport() {
return ApplicationAttemptReport.newInstance(
createFakeApplicationAttemptId(), "localhost", 0, "", "", "",
- YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000l,
- 1200l);
+ YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000L,
+ 1200L);
}
public List<ApplicationAttemptReport>
@@ -705,7 +705,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
public ContainerReport createFakeContainerReport() {
return ContainerReport.newInstance(createFakeContainerId(), null,
- NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0,
+ NodeId.newInstance("localhost", 0), null, 1000L, 1200L, "", "", 0,
ContainerState.COMPLETE,
"http://" + NodeId.newInstance("localhost", 0).toString());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index bc21fdeca4b..1b2c13979b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -17,18 +17,6 @@
*/
package org.apache.hadoop.yarn.client.cli;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
@@ -46,6 +34,13 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
import org.apache.commons.cli.Options;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.hadoop.conf.Configuration;
@@ -88,14 +83,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mortbay.log.Log;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
public class TestYarnCLI {
@@ -2072,7 +2072,7 @@ public class TestYarnCLI {
NodeReport nodeReport = NodeReport.newInstance(NodeId
.newInstance("host" + i, 0), state, "host" + 1 + ":8888",
"rack1", Records.newRecord(Resource.class), Records
- .newRecord(Resource.class), 0, "", 0, nodeLabels);
+ .newRecord(Resource.class), 0, "", 0, nodeLabels, null, null);
if (!emptyResourceUtilization) {
ResourceUtilization containersUtilization = ResourceUtilization
.newInstance(1024, 2048, 4);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java
index 0d205e90da1..ced588d30e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@@ -150,8 +151,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public void setNodeId(NodeId nodeId) {
maybeInitBuilder();
- if (nodeId == null)
+ if (nodeId == null) {
builder.clearNodeId();
+ }
this.nodeId = nodeId;
}
@@ -177,8 +179,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public void setCapability(Resource capability) {
maybeInitBuilder();
- if (capability == null)
+ if (capability == null) {
builder.clearCapability();
+ }
this.capability = capability;
}
@@ -215,8 +218,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public void setUsed(Resource used) {
maybeInitBuilder();
- if (used == null)
+ if (used == null) {
builder.clearUsed();
+ }
this.used = used;
}
@@ -234,8 +238,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public boolean equals(Object other) {
- if (other == null)
+ if (other == null) {
return false;
+ }
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
@@ -278,8 +283,9 @@ public class NodeReportPBImpl extends NodeReport {
}
private void mergeLocalToProto() {
- if (viaProto)
+ if (viaProto) {
maybeInitBuilder();
+ }
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
@@ -387,4 +393,38 @@ public class NodeReportPBImpl extends NodeReport {
}
this.nodeUtilization = nodeResourceUtilization;
}
+
+ @Override
+ public Integer getDecommissioningTimeout() {
+ NodeReportProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.hasDecommissioningTimeout())
+ ? p.getDecommissioningTimeout() : null;
+ }
+
+ @Override
+ public void setDecommissioningTimeout(Integer decommissioningTimeout) {
+ maybeInitBuilder();
+ if (decommissioningTimeout == null || decommissioningTimeout < 0) {
+ builder.clearDecommissioningTimeout();
+ return;
+ }
+ builder.setDecommissioningTimeout(decommissioningTimeout);
+ }
+
+ @Override
+ public NodeUpdateType getNodeUpdateType() {
+ NodeReportProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.hasNodeUpdateType()) ?
+ ProtoUtils.convertFromProtoFormat(p.getNodeUpdateType()) : null;
+ }
+
+ @Override
+ public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {
+ maybeInitBuilder();
+ if (nodeUpdateType == null) {
+ builder.clearNodeUpdateType();
+ return;
+ }
+ builder.setNodeUpdateType(ProtoUtils.convertToProtoFormat(nodeUpdateType));
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 8176068984f..e9c89c536ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
@@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceTypesProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeUpdateTypeProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -356,6 +358,16 @@ public class ProtoUtils {
return ContainerRetryPolicy.valueOf(e.name());
}
+ /*
+ * NodeUpdateType
+ */
+ public static NodeUpdateTypeProto convertToProtoFormat(NodeUpdateType e) {
+ return NodeUpdateTypeProto.valueOf(e.name());
+ }
+ public static NodeUpdateType convertFromProtoFormat(NodeUpdateTypeProto e) {
+ return NodeUpdateType.valueOf(e.name());
+ }
+
/*
* ExecutionType
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index cc3bf38046d..b6145c99ef0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -187,23 +188,26 @@ public class BuilderUtils {
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime) {
return newNodeReport(nodeId, nodeState, httpAddress, rackName, used,
- capability, numContainers, healthReport, lastHealthReportTime, null);
+ capability, numContainers, healthReport, lastHealthReportTime,
+ null, null, null);
}
public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime,
- Set<String> nodeLabels) {
+ Set<String> nodeLabels, Integer decommissioningTimeout,
+ NodeUpdateType nodeUpdateType) {
return newNodeReport(nodeId, nodeState, httpAddress, rackName, used,
capability, numContainers, healthReport, lastHealthReportTime,
- nodeLabels, null, null);
+ nodeLabels, null, null, decommissioningTimeout, nodeUpdateType);
}
public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime,
Set<String> nodeLabels, ResourceUtilization containersUtilization,
- ResourceUtilization nodeUtilization) {
+ ResourceUtilization nodeUtilization, Integer decommissioningTimeout,
+ NodeUpdateType nodeUpdateType) {
NodeReport nodeReport = recordFactory.newRecordInstance(NodeReport.class);
nodeReport.setNodeId(nodeId);
nodeReport.setNodeState(nodeState);
@@ -217,6 +221,8 @@ public class BuilderUtils {
nodeReport.setNodeLabels(nodeLabels);
nodeReport.setAggregatedContainersUtilization(containersUtilization);
nodeReport.setNodeUtilization(nodeUtilization);
+ nodeReport.setDecommissioningTimeout(decommissioningTimeout);
+ nodeReport.setNodeUpdateType(nodeUpdateType);
return nodeReport;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 98322b1a292..b01d606d151 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -1101,7 +1101,8 @@ public class ClientRMService extends AbstractService implements
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels(), rmNode.getAggregatedContainersUtilization(),
- rmNode.getNodeUtilization());
+ rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout(),
+ null);
return report;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
index 9631803e3fd..ca3eb798414 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
@@ -72,11 +72,6 @@ public class DecommissioningNodesWatcher {
private final RMContext rmContext;
- // Default timeout value in mills.
- // Negative value indicates no timeout. 0 means immediate.
- private long defaultTimeoutMs =
- 1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
-
// Once a RMNode is observed in DECOMMISSIONING state,
// All its ContainerStatus update are tracked inside DecomNodeContext.
class DecommissioningNodeContext {
@@ -105,16 +100,15 @@ public class DecommissioningNodesWatcher {
private long lastUpdateTime;
- public DecommissioningNodeContext(NodeId nodeId) {
+ public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) {
this.nodeId = nodeId;
this.appIds = new HashSet<ApplicationId>();
this.decommissioningStartTime = mclock.getTime();
- this.timeoutMs = defaultTimeoutMs;
+ this.timeoutMs = 1000L * timeoutSec;
}
- void updateTimeout(Integer timeoutSec) {
- this.timeoutMs = (timeoutSec == null)?
- defaultTimeoutMs : (1000L * timeoutSec);
+ void updateTimeout(int timeoutSec) {
+ this.timeoutMs = 1000L * timeoutSec;
}
}
@@ -132,7 +126,6 @@ public class DecommissioningNodesWatcher {
}
public void init(Configuration conf) {
- readDecommissioningTimeout(conf);
int v = conf.getInt(
YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL,
YarnConfiguration
@@ -162,7 +155,8 @@ public class DecommissioningNodesWatcher {
}
} else if (rmNode.getState() == NodeState.DECOMMISSIONING) {
if (context == null) {
- context = new DecommissioningNodeContext(rmNode.getNodeID());
+ context = new DecommissioningNodeContext(rmNode.getNodeID(),
+ rmNode.getDecommissioningTimeout());
decomNodes.put(rmNode.getNodeID(), context);
context.nodeState = rmNode.getState();
context.decommissionedTime = 0;
@@ -416,24 +410,4 @@ public class DecommissioningNodesWatcher {
LOG.debug("Decommissioning node: " + sb.toString());
}
}
-
- // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
- // This enables DecommissioningNodesWatcher to pick up new value
- // without ResourceManager restart.
- private void readDecommissioningTimeout(Configuration conf) {
- try {
- if (conf == null) {
- conf = new YarnConfiguration();
- }
- int v = conf.getInt(
- YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
- YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
- if (defaultTimeoutMs != 1000L * v) {
- defaultTimeoutMs = 1000L * v;
- LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs);
- }
- } catch (Exception e) {
- LOG.info("Error readDecommissioningTimeout ", e);
- }
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index a18b4c7e0ff..cab57f828f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@@ -83,6 +84,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
/**
@@ -326,10 +329,12 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
}
private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) {
- List<RMNode> updatedNodes = new ArrayList<>();
+ Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
List<NodeReport> updatedNodeReports = new ArrayList<>();
- for(RMNode rmNode: updatedNodes) {
+ for(Map.Entry<RMNode, NodeUpdateType> rmNodeEntry :
+ updatedNodes.entrySet()) {
+ RMNode rmNode = rmNodeEntry.getKey();
SchedulerNodeReport schedulerNodeReport =
getScheduler().getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
@@ -344,7 +349,8 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
- rmNode.getNodeLabels());
+ rmNode.getNodeLabels(), rmNode.getDecommissioningTimeout(),
+ rmNodeEntry.getValue());
updatedNodeReports.add(report);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 4c43b8a6658..a40b12c1017 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -31,6 +31,8 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,8 +42,8 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.HostsFileReader.HostDetails;
-import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -60,8 +62,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
-import com.google.common.annotations.VisibleForTesting;
-
@SuppressWarnings("unchecked")
public class NodesListManager extends CompositeService implements
EventHandler<NodesListManagerEvent> {
@@ -72,6 +72,11 @@ public class NodesListManager extends CompositeService implements
private Configuration conf;
private final RMContext rmContext;
+ // Default decommissioning timeout value in seconds.
+ // Negative value indicates no timeout. 0 means immediate.
+ private int defaultDecTimeoutSecs =
+ YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
+
private String includesFile;
private String excludesFile;
@@ -214,6 +219,11 @@ public class NodesListManager extends CompositeService implements
private void refreshHostsReader(
Configuration yarnConf, boolean graceful, Integer timeout)
throws IOException, YarnException {
+ // resolve the default timeout to the decommission timeout that is
+ // configured at this moment
+ if (null == timeout) {
+ timeout = readDecommissioningTimeout(yarnConf);
+ }
if (null == yarnConf) {
yarnConf = new YarnConfiguration();
}
@@ -252,7 +262,7 @@ public class NodesListManager extends CompositeService implements
// Gracefully decommission excluded nodes that are not already
// DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes
// that are already DECOMMISSIONED or DECOMMISSIONING.
- private void handleExcludeNodeList(boolean graceful, Integer timeout) {
+ private void handleExcludeNodeList(boolean graceful, int timeout) {
// DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned.
List<RMNode> nodesToRecom = new ArrayList<RMNode>();
@@ -458,36 +468,40 @@ public class NodesListManager extends CompositeService implements
&& !(excludeList.contains(hostName) || excludeList.contains(ip));
}
+ private void sendRMAppNodeUpdateEventToNonFinalizedApps(
+ RMNode eventNode, RMAppNodeUpdateType appNodeUpdateType) {
+ for(RMApp app : rmContext.getRMApps().values()) {
+ if (!app.isAppFinalStateStored()) {
+ this.rmContext
+ .getDispatcher()
+ .getEventHandler()
+ .handle(
+ new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
+ appNodeUpdateType));
+ }
+ }
+ }
+
@Override
public void handle(NodesListManagerEvent event) {
RMNode eventNode = event.getNode();
switch (event.getType()) {
case NODE_UNUSABLE:
LOG.debug(eventNode + " reported unusable");
- for(RMApp app: rmContext.getRMApps().values()) {
- if (!app.isAppFinalStateStored()) {
- this.rmContext
- .getDispatcher()
- .getEventHandler()
- .handle(
- new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
- RMAppNodeUpdateType.NODE_UNUSABLE));
- }
- }
+ sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode,
+ RMAppNodeUpdateType.NODE_UNUSABLE);
break;
case NODE_USABLE:
LOG.debug(eventNode + " reported usable");
- for (RMApp app : rmContext.getRMApps().values()) {
- if (!app.isAppFinalStateStored()) {
- this.rmContext
- .getDispatcher()
- .getEventHandler()
- .handle(
- new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
- RMAppNodeUpdateType.NODE_USABLE));
- }
- }
+ sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode,
+ RMAppNodeUpdateType.NODE_USABLE);
+ break;
+ case NODE_DECOMMISSIONING:
+ LOG.debug(eventNode + " reported decommissioning");
+ sendRMAppNodeUpdateEventToNonFinalizedApps(
+ eventNode, RMAppNodeUpdateType.NODE_DECOMMISSIONING);
break;
+
default:
LOG.error("Ignoring invalid eventtype " + event.getType());
}
@@ -606,6 +620,28 @@ public class NodesListManager extends CompositeService implements
}
}
+ // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
+ // This enables NodesListManager to pick up new value without
+ // ResourceManager restart.
+ private int readDecommissioningTimeout(Configuration pConf) {
+ try {
+ if (pConf == null) {
+ pConf = new YarnConfiguration();
+ }
+ int configuredDefaultDecTimeoutSecs =
+ pConf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+ YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+ if (defaultDecTimeoutSecs != configuredDefaultDecTimeoutSecs) {
+ defaultDecTimeoutSecs = configuredDefaultDecTimeoutSecs;
+ LOG.info("Use new decommissioningTimeoutSecs: "
+ + defaultDecTimeoutSecs);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error readDecommissioningTimeout " + e.getMessage());
+ }
+ return defaultDecTimeoutSecs;
+ }
+
/**
* A NodeId instance needed upon startup for populating inactive nodes Map.
* It only knows the hostname/ip and marks the port to -1 or invalid.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
index 2afc8e6e4a7..db16bc4e121 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
public enum NodesListManagerEventType {
NODE_USABLE,
- NODE_UNUSABLE
+ NODE_UNUSABLE,
+ NODE_DECOMMISSIONING
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 936f78b75b4..5711c2bcd16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -154,10 +154,12 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* received by the RMApp. Updates can be node becoming lost or becoming
* healthy etc. The method clears the information from the {@link RMApp}. So
* each call to this method gives the delta from the previous call.
- * @param updatedNodes Collection into which the updates are transferred
- * @return the number of nodes added to the {@link Collection}
+ * @param updatedNodes Map into which the updates are transferred, with each
+ * node updates as the key, and the {@link NodeUpdateType} for that update
+ * as the corresponding value.
+ * @return the number of nodes added to the {@link Map}
*/
- int pullRMNodeUpdates(Collection<RMNode> updatedNodes);
+ int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes);
/**
* The finish time of the {@link RMApp}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 48a4ed80ccb..d2101df3a2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -20,11 +20,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.net.InetAddress;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -59,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -146,7 +145,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
= new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
private final long submitTime;
- private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
+ private final Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
private final String applicationType;
private final Set<String> applicationTags;
@@ -703,11 +702,11 @@ public class RMAppImpl implements RMApp, Recoverable {
}
@Override
- public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
+ public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> upNodes) {
this.writeLock.lock();
try {
int updatedNodeCount = this.updatedNodes.size();
- updatedNodes.addAll(this.updatedNodes);
+ upNodes.putAll(this.updatedNodes);
this.updatedNodes.clear();
return updatedNodeCount;
} finally {
@@ -1025,7 +1024,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
NodeState nodeState = node.getState();
- updatedNodes.add(node);
+ updatedNodes.put(node, RMAppNodeUpdateType.convertToNodeUpdateType(type));
LOG.debug("Received node update event:" + type + " for node:" + node
+ " with state:" + nodeState);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java
index ba8af9801f7..245d9a6edbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java
@@ -19,13 +19,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class RMAppNodeUpdateEvent extends RMAppEvent {
public enum RMAppNodeUpdateType {
NODE_USABLE,
- NODE_UNUSABLE
+ NODE_UNUSABLE,
+ NODE_DECOMMISSIONING;
+
+ public static NodeUpdateType convertToNodeUpdateType(
+ RMAppNodeUpdateType rmAppNodeUpdateType) {
+ return NodeUpdateType.valueOf(rmAppNodeUpdateType.name());
+ }
}
private final RMNode node;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 4d603081bf9..ab6e40918fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -179,7 +179,7 @@ public interface RMNode {
/**
* Optional decommissioning timeout in second
- * (null indicates default timeout).
+ * (null indicates absent timeout).
* @return the decommissioning timeout in second.
*/
Integer getDecommissioningTimeout();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 5a188d060c8..fca5115c889 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -1176,6 +1176,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// Update NM metrics during graceful decommissioning.
rmNode.updateMetricsForGracefulDecommission(initState, finalState);
rmNode.decommissioningTimeout = timeout;
+ // Notify NodesListManager to notify all RMApp so that each
+ // Application Master could take any required actions.
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_DECOMMISSIONING, rmNode));
if (rmNode.originalTotalCapability == null){
rmNode.originalTotalCapability =
Resources.clone(rmNode.totalCapability);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 915c719c2d6..e7240b267c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
@@ -28,8 +27,10 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-
import java.util.Set;
+
+import org.junit.Assert;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -92,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
@@ -112,7 +114,6 @@ import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.junit.Assert;
@SuppressWarnings("unchecked")
@@ -956,12 +957,26 @@ public class MockRM extends ResourceManager {
node.getState());
}
+ public void sendNodeGracefulDecommission(
+ MockNM nm, int timeout) throws Exception {
+ RMNodeImpl node = (RMNodeImpl)
+ getRMContext().getRMNodes().get(nm.getNodeId());
+ Assert.assertNotNull("node shouldn't be null", node);
+ node.handle(new RMNodeDecommissioningEvent(nm.getNodeId(), timeout));
+ }
+
public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception {
RMNodeImpl node = (RMNodeImpl)
getRMContext().getRMNodes().get(nm.getNodeId());
+ Assert.assertNotNull("node shouldn't be null", node);
node.handle(new RMNodeEvent(nm.getNodeId(), event));
}
+ public Integer getDecommissioningTimeout(NodeId nodeid) {
+ return this.getRMContext().getRMNodes()
+ .get(nodeid).getDecommissioningTimeout();
+ }
+
public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
ApplicationClientProtocol client = getClientRMService();
KillApplicationRequest req = KillApplicationRequest.newInstance(appId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 1b13acbc9bc..fe51878fa31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -18,18 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyListOf;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
@@ -51,6 +39,13 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.mockito.Matchers;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -153,22 +148,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
-
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Test;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import org.mockito.Matchers;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
public class TestClientRMService {
@@ -183,6 +182,50 @@ public class TestClientRMService {
private final static String QUEUE_2 = "Q-2";
private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo";
+ @Test
+ public void testGetDecommissioningClusterNodes() throws Exception {
+ MockRM rm = new MockRM() {
+ protected ClientRMService createClientRMService() {
+ return new ClientRMService(this.rmContext, scheduler,
+ this.rmAppManager, this.applicationACLsManager,
+ this.queueACLsManager,
+ this.getRMContext().getRMDelegationTokenSecretManager());
+ };
+ };
+ rm.start();
+
+ int nodeMemory = 1024;
+ MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
+ rm.sendNodeStarted(nm1);
+ nm1.nodeHeartbeat(true);
+ rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
+ Integer decommissioningTimeout = 600;
+ rm.sendNodeGracefulDecommission(nm1, decommissioningTimeout);
+ rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
+
+ // Create a client.
+ Configuration conf = new Configuration();
+ YarnRPC rpc = YarnRPC.create(conf);
+ InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
+ ApplicationClientProtocol client =
+ (ApplicationClientProtocol) rpc
+ .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+
+ // Make call
+ List<NodeReport> nodeReports = client.getClusterNodes(
+ GetClusterNodesRequest.newInstance(
+ EnumSet.of(NodeState.DECOMMISSIONING)))
+ .getNodeReports();
+ Assert.assertEquals(1, nodeReports.size());
+ NodeReport nr = nodeReports.iterator().next();
+ Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout());
+ Assert.assertNull(nr.getNodeUpdateType());
+
+ rpc.stopProxy(client, conf);
+ rm.close();
+ }
+
@Test
public void testGetClusterNodes() throws Exception {
MockRM rm = new MockRM() {
@@ -231,6 +274,8 @@ public class TestClientRMService {
// Check node's label = x
Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x"));
+ Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
+ Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
// Now make the node unhealthy.
node.nodeHeartbeat(false);
@@ -254,6 +299,8 @@ public class TestClientRMService {
nodeReports.get(0).getNodeState());
Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y"));
+ Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
+ Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
// Remove labels of host1
map = new HashMap<NodeId, Set<String>>();
@@ -270,6 +317,8 @@ public class TestClientRMService {
for (NodeReport report : nodeReports) {
Assert.assertTrue(report.getNodeLabels() != null
&& report.getNodeLabels().isEmpty());
+ Assert.assertNull(report.getDecommissioningTimeout());
+ Assert.assertNull(report.getNodeUpdateType());
}
rpc.stopProxy(client, conf);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
index 690de308e23..09f1482eb5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.List;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -36,10 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
/**
* This class tests DecommissioningNodesWatcher.
@@ -69,7 +69,8 @@ public class TestDecommissioningNodesWatcher {
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
// Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher.
- rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION);
+ rm.sendNodeGracefulDecommission(nm1,
+ YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
rm.waitForState(id1, NodeState.DECOMMISSIONING);
// Update status with decreasing number of running containers until 0.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 272633269a2..6ecb306fb8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -17,20 +17,21 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -48,8 +49,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
- .AllocationExpirationInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@@ -71,13 +71,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestRMNodeTransitions {
@@ -98,13 +100,16 @@ public class TestRMNodeTransitions {
}
private NodesListManagerEvent nodesListManagerEvent = null;
-
+ private List<NodeState> nodesListManagerEventsNodeStateSequence =
+ new LinkedList<>();
+
private class TestNodeListManagerEventDispatcher implements
EventHandler<NodesListManagerEvent> {
@Override
public void handle(NodesListManagerEvent event) {
nodesListManagerEvent = event;
+ nodesListManagerEventsNodeStateSequence.add(event.getNode().getState());
}
}
@@ -150,7 +155,7 @@ public class TestRMNodeTransitions {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
nodesListManagerEvent = null;
-
+ nodesListManagerEventsNodeStateSequence.clear();
}
@After
@@ -712,6 +717,8 @@ public class TestRMNodeTransitions {
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.GRACEFUL_DECOMMISSION));
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+ Assert.assertEquals(Arrays.asList(NodeState.NEW, NodeState.RUNNING),
+ nodesListManagerEventsNodeStateSequence);
Assert
.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1,
@@ -999,7 +1006,7 @@ public class TestRMNodeTransitions {
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
Assert.assertNotNull(nodesListManagerEvent);
- Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
+ Assert.assertEquals(NodesListManagerEventType.NODE_DECOMMISSIONING,
nodesListManagerEvent.getType());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 7c5181487df..d81ad93de96 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -18,13 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -39,7 +32,21 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsSystem;
@@ -78,8 +85,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -96,17 +103,24 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestResourceTrackerService extends NodeLabelTestBase {
private final static File TEMP_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "decommision");
- private final File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
+ private final File hostFile =
+ new File(TEMP_DIR + File.separator + "hostFile.txt");
private final File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
+ private final File excludeHostXmlFile =
+ new File(TEMP_DIR + File.separator + "excludeHostFile.xml");
private MockRM rm;
@@ -291,6 +305,68 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
}
+ @Ignore
+ @Test
+ public void testGracefulDecommissionDefaultTimeoutResolution()
+ throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostXmlFile
+ .getAbsolutePath());
+
+ writeToHostsXmlFile(excludeHostXmlFile, Pair.<String, Integer>of("", null));
+ rm = new MockRM(conf);
+ rm.start();
+
+ int nodeMemory = 1024;
+ MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
+ MockNM nm2 = rm.registerNode("host2:5678", nodeMemory);
+ MockNM nm3 = rm.registerNode("host3:9101", nodeMemory);
+
+ NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+ NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+ NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true);
+
+ Assert.assertTrue(
+ NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
+ Assert.assertTrue(
+ NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction()));
+ Assert.assertTrue(
+ NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction()));
+
+ rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
+ rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
+ rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);
+
+ // Graceful decommission both host1 and host2, with
+ // non default timeout for host1
+ final Integer nm1DecommissionTimeout = 20;
+ writeToHostsXmlFile(
+ excludeHostXmlFile,
+ Pair.of(nm1.getNodeId().getHost(), nm1DecommissionTimeout),
+ Pair.<String, Integer>of(nm2.getNodeId().getHost(), null));
+ rm.getNodesListManager().refreshNodes(conf, true);
+ rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
+ rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
+ Assert.assertEquals(
+ nm1DecommissionTimeout, rm.getDecommissioningTimeout(nm1.getNodeId()));
+ Integer defaultDecTimeout =
+ conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+ YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+ Assert.assertEquals(
+ defaultDecTimeout, rm.getDecommissioningTimeout(nm2.getNodeId()));
+
+ // Graceful decommission host3 with a new default timeout
+ final Integer newDefaultDecTimeout = defaultDecTimeout + 10;
+ writeToHostsXmlFile(
+ excludeHostXmlFile, Pair.<String, Integer>of(nm3.getNodeId().getHost(), null));
+ conf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+ newDefaultDecTimeout);
+ rm.getNodesListManager().refreshNodes(conf, true);
+ rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING);
+ Assert.assertEquals(
+ newDefaultDecTimeout, rm.getDecommissioningTimeout(nm3.getNodeId()));
+ }
+
/**
* Graceful decommission node with running application.
*/
@@ -1965,16 +2041,20 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.stop();
}
+ private void ensureFileExists(File file) throws IOException {
+ if (!file.exists()) {
+ TEMP_DIR.mkdirs();
+ file.createNewFile();
+ }
+ }
+
private void writeToHostsFile(String... hosts) throws IOException {
writeToHostsFile(hostFile, hosts);
}
private void writeToHostsFile(File file, String... hosts)
throws IOException {
- if (!file.exists()) {
- TEMP_DIR.mkdirs();
- file.createNewFile();
- }
+ ensureFileExists(file);
FileOutputStream fStream = null;
try {
fStream = new FileOutputStream(file);
@@ -1990,6 +2070,33 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
}
}
+ private void writeToHostsXmlFile(
+ File file, Pair<String, Integer>... hostsAndTimeouts) throws Exception {
+ ensureFileExists(file);
+ DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+ Document doc = dbFactory.newDocumentBuilder().newDocument();
+ Element hosts = doc.createElement("hosts");
+ doc.appendChild(hosts);
+ for (Pair<String, Integer> hostsAndTimeout : hostsAndTimeouts) {
+ Element host = doc.createElement("host");
+ hosts.appendChild(host);
+ Element name = doc.createElement("name");
+ host.appendChild(name);
+ name.appendChild(doc.createTextNode(hostsAndTimeout.getLeft()));
+ if (hostsAndTimeout.getRight() != null) {
+ Element timeout = doc.createElement("timeout");
+ host.appendChild(timeout);
+ timeout.appendChild(
+ doc.createTextNode(hostsAndTimeout.getRight().toString())
+ );
+ }
+ }
+ TransformerFactory transformerFactory = TransformerFactory.newInstance();
+ Transformer transformer = transformerFactory.newTransformer();
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+ transformer.transform(new DOMSource(doc), new StreamResult(file));
+ }
+
private void checkDecommissionedNMCount(MockRM rm, int count)
throws InterruptedException {
int waitCount = 0;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index ebd8636f765..81a396f26c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -163,7 +163,7 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
+ public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes) {
throw new UnsupportedOperationException("Not supported yet.");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
index f9f0b746233..095d0027009 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.security.PrivilegedExceptionAction;
import java.util.List;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
@@ -31,6 +34,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -39,9 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
public class TestAMRMRPCNodeUpdates {
private MockRM rm;
@@ -53,8 +54,8 @@ public class TestAMRMRPCNodeUpdates {
@Override
public void init(Configuration conf) {
conf.set(
- CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
- "1.0");
+ CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
+ "1.0");
super.init(conf);
}
};
@@ -62,19 +63,19 @@ public class TestAMRMRPCNodeUpdates {
rm.start();
amService = rm.getApplicationMasterService();
}
-
+
@After
public void tearDown() {
if (rm != null) {
this.rm.stop();
}
}
-
+
private void syncNodeHeartbeat(MockNM nm, boolean health) throws Exception {
nm.nodeHeartbeat(health);
rm.drainEvents();
}
-
+
private void syncNodeLost(MockNM nm) throws Exception {
rm.sendNodeStarted(nm);
rm.waitForState(nm.getNodeId(), NodeState.RUNNING);
@@ -82,13 +83,20 @@ public class TestAMRMRPCNodeUpdates {
rm.drainEvents();
}
+ private void syncNodeGracefulDecommission(
+ MockNM nm, int timeout) throws Exception {
+ rm.sendNodeGracefulDecommission(nm, timeout);
+ rm.waitForState(nm.getNodeId(), NodeState.DECOMMISSIONING);
+ rm.drainEvents();
+ }
+
private AllocateResponse allocate(final ApplicationAttemptId attemptId,
final AllocateRequest req) throws Exception {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token =
rm.getRMContext().getRMApps().get(attemptId.getApplicationId())
- .getRMAppAttempt(attemptId).getAMRMToken();
+ .getRMAppAttempt(attemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
@Override
@@ -98,9 +106,42 @@ public class TestAMRMRPCNodeUpdates {
});
}
+ @Test
+ public void testAMRMDecommissioningNodes() throws Exception {
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
+ MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
+ rm.drainEvents();
+
+ RMApp app1 = rm.submitApp(2000);
+
+ // Trigger the scheduling so the AM gets 'launched' on nm1
+ nm1.nodeHeartbeat(true);
+
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+
+ // register AM returns no unusable node
+ am1.registerAppAttempt();
+
+ Integer decommissioningTimeout = 600;
+ syncNodeGracefulDecommission(nm2, decommissioningTimeout);
+
+ AllocateRequest allocateRequest1 =
+ AllocateRequest.newInstance(0, 0F, null, null, null);
+ AllocateResponse response1 =
+ allocate(attempt1.getAppAttemptId(), allocateRequest1);
+ List<NodeReport> updatedNodes = response1.getUpdatedNodes();
+ Assert.assertEquals(1, updatedNodes.size());
+ NodeReport nr = updatedNodes.iterator().next();
+ Assert.assertEquals(
+ decommissioningTimeout, nr.getDecommissioningTimeout());
+ Assert.assertEquals(
+ NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType());
+ }
+
@Test
public void testAMRMUnusableNodes() throws Exception {
-
+
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 10000);
@@ -114,7 +155,7 @@ public class TestAMRMRPCNodeUpdates {
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
-
+
// register AM returns no unusable node
am1.registerAppAttempt();
@@ -127,18 +168,20 @@ public class TestAMRMRPCNodeUpdates {
Assert.assertEquals(0, updatedNodes.size());
syncNodeHeartbeat(nm4, false);
-
+
// allocate request returns updated node
allocateRequest1 =
AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
- null);
+ null);
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
NodeReport nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
-
+ Assert.assertNull(nr.getDecommissioningTimeout());
+ Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
+
// resending the allocate request returns the same result
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
@@ -146,30 +189,34 @@ public class TestAMRMRPCNodeUpdates {
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
+ Assert.assertNull(nr.getDecommissioningTimeout());
+ Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
syncNodeLost(nm3);
-
+
// subsequent allocate request returns delta
allocateRequest1 =
AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
- null);
+ null);
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm3.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.LOST, nr.getNodeState());
-
+ Assert.assertNull(nr.getDecommissioningTimeout());
+ Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
+
// registering another AM gives it the complete failed list
RMApp app2 = rm.submitApp(2000);
// Trigger nm2 heartbeat so that AM gets launched on it
nm2.nodeHeartbeat(true);
RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
-
+
// register AM returns all unusable nodes
am2.registerAppAttempt();
-
+
// allocate request returns no updated node
AllocateRequest allocateRequest2 =
AllocateRequest.newInstance(0, 0F, null, null, null);
@@ -177,39 +224,43 @@ public class TestAMRMRPCNodeUpdates {
allocate(attempt2.getAppAttemptId(), allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
-
+
syncNodeHeartbeat(nm4, true);
-
+
// both AM's should get delta updated nodes
allocateRequest1 =
AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
- null);
+ null);
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
-
+ Assert.assertNull(nr.getDecommissioningTimeout());
+ Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
+
allocateRequest2 =
AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
- null);
+ null);
response2 = allocate(attempt2.getAppAttemptId(), allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
+ Assert.assertNull(nr.getDecommissioningTimeout());
+ Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
// subsequent allocate calls should return no updated nodes
allocateRequest2 =
AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
- null);
+ null);
response2 = allocate(attempt2.getAppAttemptId(), allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
-
+
// how to do the above for LOST node
-
+
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 330c5c672ca..caa7d1e2af0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
-import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -246,7 +246,7 @@ public class MockRMApp implements RMApp {
}
@Override
- public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
+ public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes) {
throw new UnsupportedOperationException("Not supported yet.");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org