You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/05/14 17:44:15 UTC
[12/50] [abbrv] hadoop git commit: YARN-8202. DefaultAMSProcessor
should properly check units of requested custom resource types against
minimum/maximum allocation (snemeth via rkanter)
YARN-8202. DefaultAMSProcessor should properly check units of requested custom resource types against minimum/maximum allocation (snemeth via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c8fa7cb6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c8fa7cb6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c8fa7cb6
Branch: refs/heads/HDDS-4
Commit: c8fa7cb6d0c12c4e65e53ea60167b856511b8294
Parents: 5c1c344
Author: Robert Kanter <rk...@apache.org>
Authored: Thu May 10 09:31:59 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Mon May 14 10:31:08 2018 -0700
----------------------------------------------------------------------
.../v2/app/rm/ContainerRequestCreator.java | 57 ++
.../v2/app/rm/TestRMContainerAllocator.java | 534 ++++++++++---------
.../hadoop/yarn/util/UnitsConversionUtil.java | 44 +-
.../resourcetypes/ResourceTypesTestHelper.java | 93 ++++
.../hadoop/yarn/server/utils/BuilderUtils.java | 8 +-
.../scheduler/SchedulerUtils.java | 95 +++-
.../TestApplicationMasterService.java | 185 +++++--
.../scheduler/TestSchedulerUtils.java | 278 +++++++++-
8 files changed, 961 insertions(+), 333 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java
new file mode 100644
index 0000000..39a9ddc
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+final class ContainerRequestCreator {
+
+ private ContainerRequestCreator() {}
+
+ static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId,
+ Resource resource, String[] hosts) {
+ return createRequest(jobId, taskAttemptId, resource, hosts,
+ false, false);
+ }
+
+ static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId,
+ Resource resource, String[] hosts, boolean earlierFailedAttempt,
+ boolean reduce) {
+ final TaskId taskId;
+ if (reduce) {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+ } else {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ }
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+ taskAttemptId);
+
+ if (earlierFailedAttempt) {
+ return ContainerRequestEvent
+ .createContainerRequestEventForFailedContainer(attemptId,
+ resource);
+ }
+ return new ContainerRequestEvent(attemptId, resource, hosts,
+ new String[]{NetworkTopology.DEFAULT_RACK});
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 7875917..427e6ea 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
+import static org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestCreator.createRequest;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyFloat;
@@ -96,7 +97,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
@@ -203,7 +203,7 @@ public class TestRMContainerAllocator {
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
- MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -215,13 +215,13 @@ public class TestRMContainerAllocator {
rm.drainEvents();
// create the container request
- ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
- new String[] { "h1" });
+ ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(jobId,
+ 1, Resource.newInstance(1024, 1), new String[] {"h1"});
allocator.sendRequest(event1);
// send 1 more request with different resource req
- ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
- new String[] { "h2" });
+ ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(jobId,
+ 2, Resource.newInstance(1024, 1), new String[] {"h2"});
allocator.sendRequest(event2);
// this tells the scheduler about the requests
@@ -232,8 +232,8 @@ public class TestRMContainerAllocator {
Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size());
// send another request with different resource and priority
- ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
- new String[] { "h3" });
+ ContainerRequestEvent event3 = ContainerRequestCreator.createRequest(jobId,
+ 3, Resource.newInstance(1024, 1), new String[] {"h3"});
allocator.sendRequest(event3);
// this tells the scheduler about the requests
@@ -242,7 +242,7 @@ public class TestRMContainerAllocator {
rm.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size());
-
+
// update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat
nodeManager2.nodeHeartbeat(true); // Node heartbeat
@@ -252,21 +252,21 @@ public class TestRMContainerAllocator {
assigned = allocator.schedule();
rm.drainEvents();
Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size());
- checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+ checkAssignments(new ContainerRequestEvent[] {event1, event2, event3},
assigned, false);
-
+
// check that the assigned container requests are cancelled
allocator.schedule();
rm.drainEvents();
Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());
}
-
- @Test
+
+ @Test
public void testMapNodeLocality() throws Exception {
- // test checks that ordering of allocated containers list from the RM does
- // not affect the map->container assignment done by the AM. If there is a
- // node local container available for a map then it should be assigned to
- // that container and not a rack-local container that happened to be seen
+ // test checks that ordering of allocated containers list from the RM does
+ // not affect the map->container assignment done by the AM. If there is a
+ // node local container available for a map then it should be assigned to
+ // that container and not a rack-local container that happened to be seen
// earlier in the allocated containers list from the RM.
// Regression test for MAPREDUCE-4893
LOG.info("Running testMapNodeLocality");
@@ -291,26 +291,29 @@ public class TestRMContainerAllocator {
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
- MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
// add resources to scheduler
- MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps
+ MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps
rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node
MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map
rm.drainEvents();
// create the container requests for maps
- ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
- new String[] { "h1" });
+ ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(
+ jobId, 1, Resource.newInstance(1024, 1),
+ new String[]{"h1"});
allocator.sendRequest(event1);
- ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
- new String[] { "h1" });
+ ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(
+ jobId, 2, Resource.newInstance(1024, 1),
+ new String[]{"h1"});
allocator.sendRequest(event2);
- ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
- new String[] { "h2" });
+ ContainerRequestEvent event3 = ContainerRequestCreator.createRequest(
+ jobId, 3, Resource.newInstance(1024, 1),
+ new String[]{"h2"});
allocator.sendRequest(event3);
// this tells the scheduler about the requests
@@ -323,14 +326,14 @@ public class TestRMContainerAllocator {
// Node heartbeat from rack-local first. This makes node h3 the first in the
// list of allocated containers but it should not be assigned to task1.
nodeManager3.nodeHeartbeat(true);
- // Node heartbeat from node-local next. This allocates 2 node local
+ // Node heartbeat from node-local next. This allocates 2 node local
// containers for task1 and task2. These should be matched with those tasks.
nodeManager1.nodeHeartbeat(true);
rm.drainEvents();
assigned = allocator.schedule();
rm.drainEvents();
- checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+ checkAssignments(new ContainerRequestEvent[] {event1, event2, event3},
assigned, false);
// remove the rack-local assignment that should have happened for task3
for(TaskAttemptContainerAssignedEvent event : assigned) {
@@ -340,7 +343,7 @@ public class TestRMContainerAllocator {
break;
}
}
- checkAssignments(new ContainerRequestEvent[] { event1, event2},
+ checkAssignments(new ContainerRequestEvent[] {event1, event2},
assigned, true);
}
@@ -381,13 +384,15 @@ public class TestRMContainerAllocator {
rm.drainEvents();
// create the container request
- ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
- new String[] { "h1" });
+ ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(
+ jobId, 1, Resource.newInstance(1024, 1),
+ new String[] {"h1"});
allocator.sendRequest(event1);
// send 1 more request with different resource req
- ContainerRequestEvent event2 = createReq(jobId, 2, 2048,
- new String[] { "h2" });
+ ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(
+ jobId, 2, Resource.newInstance(1024, 1),
+ new String[] {"h2"});
allocator.sendRequest(event2);
// this tells the scheduler about the requests
@@ -404,7 +409,7 @@ public class TestRMContainerAllocator {
assigned = allocator.schedule();
rm.drainEvents();
- checkAssignments(new ContainerRequestEvent[] { event1, event2 },
+ checkAssignments(new ContainerRequestEvent[] {event1, event2},
assigned, false);
}
@@ -439,15 +444,19 @@ public class TestRMContainerAllocator {
rm.drainEvents();
// create the container request
- final String[] locations = new String[] { host };
- allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
+ final String[] locations = new String[] {host};
+ allocator.sendRequest(createRequest(jobId, 0,
+ Resource.newInstance(1024, 1),
+ locations, false, true));
for (int i = 0; i < 1;) {
rm.drainEvents();
i += allocator.schedule().size();
nm.nodeHeartbeat(true);
}
- allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false));
+ allocator.sendRequest(createRequest(jobId, 0,
+ Resource.newInstance(1024, 1),
+ locations, true, false));
while (allocator.getTaskAttemptKillEvents().size() == 0) {
rm.drainEvents();
allocator.schedule().size();
@@ -494,9 +503,10 @@ public class TestRMContainerAllocator {
RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests();
ContainerRequestEvent event1 =
- createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+ createRequest(jobId, 1, Resource.newInstance(2048, 1),
+ new String[] {"h1"}, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
- new RMContainerRequestor.ContainerRequest(event1, null,null));
+ new RMContainerRequestor.ContainerRequest(event1, null, null));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
@@ -547,9 +557,12 @@ public class TestRMContainerAllocator {
RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests();
ContainerRequestEvent event1 =
- createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+ createRequest(jobId, 1,
+ Resource.newInstance(2048, 1),
+ new String[] {"h1"}, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
- new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime()));
+ new RMContainerRequestor.ContainerRequest(event1, null,
+ clock.getTime()));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
@@ -561,7 +574,7 @@ public class TestRMContainerAllocator {
clock.setTime(clock.getTime() + (preemptThreshold) * 1000);
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is not preeempted", 1,
- assignedRequests.preemptionWaitingReduces.size());
+ assignedRequests.preemptionWaitingReduces.size());
}
@Test(timeout = 30000)
@@ -608,9 +621,12 @@ public class TestRMContainerAllocator {
RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests();
ContainerRequestEvent event1 =
- createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+ createRequest(jobId, 1,
+ Resource.newInstance(2048, 1),
+ new String[] {"h1"}, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
- new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime()));
+ new RMContainerRequestor.ContainerRequest(event1, null,
+ clock.getTime()));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
@@ -651,13 +667,17 @@ public class TestRMContainerAllocator {
appAttemptId, mockJob, SystemClock.getInstance());
// request to allocate two reduce priority containers
- final String[] locations = new String[] { host };
- allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
+ final String[] locations = new String[] {host};
+ allocator.sendRequest(createRequest(jobId, 0,
+ Resource.newInstance(1024, 1),
+ locations, false, true));
allocator.scheduleAllReduces();
allocator.makeRemoteRequest();
nm.nodeHeartbeat(true);
rm.drainEvents();
- allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false));
+ allocator.sendRequest(createRequest(jobId, 1,
+ Resource.newInstance(1024, 1),
+ locations, false, false));
int assignedContainer;
for (assignedContainer = 0; assignedContainer < 1;) {
@@ -684,7 +704,7 @@ public class TestRMContainerAllocator {
conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes");
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId =
- ApplicationAttemptId.newInstance(appId, 1);
+ ApplicationAttemptId.newInstance(appId, 1);
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
@@ -706,13 +726,16 @@ public class TestRMContainerAllocator {
// create some map requests
ContainerRequestEvent reqMapEvents;
- reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" });
+ reqMapEvents = ContainerRequestCreator.createRequest(jobId, 0,
+ Resource.newInstance(1024, 1), new String[]{"map"});
allocator.sendRequests(Arrays.asList(reqMapEvents));
// create some reduce requests
ContainerRequestEvent reqReduceEvents;
reqReduceEvents =
- createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true);
+ createRequest(jobId, 0,
+ Resource.newInstance(2048, 1),
+ new String[] {"reduce"}, false, true);
allocator.sendRequests(Arrays.asList(reqReduceEvents));
allocator.schedule();
// verify all of the host-specific asks were sent plus one for the
@@ -883,18 +906,21 @@ public class TestRMContainerAllocator {
// create the container request
// send MAP request
- ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] {
- "h1", "h2" }, true, false);
+ ContainerRequestEvent event1 = createRequest(jobId, 1,
+ Resource.newInstance(2048, 1),
+ new String[] {"h1", "h2"}, true, false);
allocator.sendRequest(event1);
// send REDUCE request
- ContainerRequestEvent event2 = createReq(jobId, 2, 3000,
- new String[] { "h1" }, false, true);
+ ContainerRequestEvent event2 = createRequest(jobId, 2,
+ Resource.newInstance(3000, 1),
+ new String[] {"h1"}, false, true);
allocator.sendRequest(event2);
// send MAP request
- ContainerRequestEvent event3 = createReq(jobId, 3, 2048,
- new String[] { "h3" }, false, false);
+ ContainerRequestEvent event3 = createRequest(jobId, 3,
+ Resource.newInstance(2048, 1),
+ new String[] {"h3"}, false, false);
allocator.sendRequest(event3);
// this tells the scheduler about the requests
@@ -911,7 +937,7 @@ public class TestRMContainerAllocator {
assigned = allocator.schedule();
rm.drainEvents();
- checkAssignments(new ContainerRequestEvent[] { event1, event3 },
+ checkAssignments(new ContainerRequestEvent[] {event1, event3},
assigned, false);
// validate that no container is assigned to h1 as it doesn't have 2048
@@ -921,10 +947,10 @@ public class TestRMContainerAllocator {
}
}
- private static class MyResourceManager extends MockRM {
+ static class MyResourceManager extends MockRM {
private static long fakeClusterTimeStamp = System.currentTimeMillis();
-
+
public MyResourceManager(Configuration conf) {
super(conf);
}
@@ -955,7 +981,7 @@ public class TestRMContainerAllocator {
protected ResourceScheduler createScheduler() {
return new MyFifoScheduler(this.getRMContext());
}
-
+
MyFifoScheduler getMyFifoScheduler() {
return (MyFifoScheduler) scheduler;
}
@@ -1221,7 +1247,7 @@ public class TestRMContainerAllocator {
Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
}
-
+
@Test
public void testUpdatedNodes() throws Exception {
Configuration conf = new Configuration();
@@ -1251,11 +1277,13 @@ public class TestRMContainerAllocator {
rm.drainEvents();
// create the map container request
- ContainerRequestEvent event = createReq(jobId, 1, 1024,
- new String[] { "h1" });
+ ContainerRequestEvent event =
+ ContainerRequestCreator.createRequest(jobId, 1,
+ Resource.newInstance(1024, 1),
+ new String[] {"h1"});
allocator.sendRequest(event);
TaskAttemptId attemptId = event.getAttemptID();
-
+
TaskAttempt mockTaskAttempt = mock(TaskAttempt.class);
when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId());
Task mockTask = mock(Task.class);
@@ -1279,7 +1307,7 @@ public class TestRMContainerAllocator {
// no updated nodes reported
Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty());
-
+
// mark nodes bad
nm1.nodeHeartbeat(false);
nm2.nodeHeartbeat(false);
@@ -1292,11 +1320,13 @@ public class TestRMContainerAllocator {
// updated nodes are reported
Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size());
- Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
- Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID());
+ Assert.assertEquals(2,
+ allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
+ Assert.assertEquals(attemptId,
+ allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID());
allocator.getJobUpdatedNodeEvents().clear();
allocator.getTaskAttemptKillEvents().clear();
-
+
assigned = allocator.schedule();
rm.drainEvents();
Assert.assertEquals(0, assigned.size());
@@ -1307,7 +1337,7 @@ public class TestRMContainerAllocator {
@Test
public void testBlackListedNodes() throws Exception {
-
+
LOG.info("Running testBlackListedNodes");
Configuration conf = new Configuration();
@@ -1315,7 +1345,7 @@ public class TestRMContainerAllocator {
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
conf.setInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
-
+
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
@@ -1331,7 +1361,7 @@ public class TestRMContainerAllocator {
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
rm.drainEvents();
-
+
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
@@ -1347,18 +1377,24 @@ public class TestRMContainerAllocator {
rm.drainEvents();
// create the container request
- ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
- new String[] { "h1" });
+ ContainerRequestEvent event1 =
+ ContainerRequestCreator.createRequest(jobId, 1,
+ Resource.newInstance(1024, 1),
+ new String[] {"h1"});
allocator.sendRequest(event1);
// send 1 more request with different resource req
- ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
- new String[] { "h2" });
+ ContainerRequestEvent event2 =
+ ContainerRequestCreator.createRequest(jobId, 2,
+ Resource.newInstance(1024, 1),
+ new String[] {"h2"});
allocator.sendRequest(event2);
// send another request with different resource and priority
- ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
- new String[] { "h3" });
+ ContainerRequestEvent event3 =
+ ContainerRequestCreator.createRequest(jobId, 3,
+ Resource.newInstance(1024, 1),
+ new String[] {"h3"});
allocator.sendRequest(event3);
// this tells the scheduler about the requests
@@ -1368,9 +1404,9 @@ public class TestRMContainerAllocator {
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// Send events to blacklist nodes h1 and h2
- ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
+ ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
allocator.sendFailure(f1);
- ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false);
+ ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false);
allocator.sendFailure(f2);
// update resources in scheduler
@@ -1392,23 +1428,23 @@ public class TestRMContainerAllocator {
assigned = allocator.schedule();
rm.drainEvents();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
- Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
nodeManager3.nodeHeartbeat(true); // Node heartbeat
rm.drainEvents();
assigned = allocator.schedule();
rm.drainEvents();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
-
+
Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
-
+
// validate that all containers are assigned to h3
for (TaskAttemptContainerAssignedEvent assig : assigned) {
Assert.assertTrue("Assigned container host not correct", "h3".equals(assig
.getContainer().getNodeId().getHost()));
}
}
-
+
@Test
public void testIgnoreBlacklisting() throws Exception {
LOG.info("Running testIgnoreBlacklisting");
@@ -1448,7 +1484,7 @@ public class TestRMContainerAllocator {
// Known=1, blacklisted=0, ignore should be false - assign first container
assigned =
- getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
+ getContainerOnHost(jobId, 1, 1024, new String[] {"h1"},
nodeManagers[0], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
@@ -1463,47 +1499,47 @@ public class TestRMContainerAllocator {
// Because makeRemoteRequest will not be aware of it until next call
// The current call will send blacklisted node "h1" to RM
assigned =
- getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
+ getContainerOnHost(jobId, 2, 1024, new String[] {"h1"},
nodeManagers[0], allocator, 1, 0, 0, 1, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// Known=1, blacklisted=1, ignore should be true - assign 1
assigned =
- getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
+ getContainerOnHost(jobId, 2, 1024, new String[] {"h1"},
nodeManagers[0], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
// Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
assigned =
- getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
+ getContainerOnHost(jobId, 3, 1024, new String[] {"h2"},
nodeManagers[1], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
// Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
assigned =
- getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
+ getContainerOnHost(jobId, 4, 1024, new String[] {"h3"},
nodeManagers[2], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Known=3, blacklisted=1, ignore should be true - assign 1
assigned =
- getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
+ getContainerOnHost(jobId, 5, 1024, new String[] {"h1"},
nodeManagers[0], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
// Known=4, blacklisted=1, ignore should be false - assign 1 anyway
assigned =
- getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
+ getContainerOnHost(jobId, 6, 1024, new String[] {"h4"},
nodeManagers[3], allocator, 0, 0, 1, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Test blacklisting re-enabled.
// Known=4, blacklisted=1, ignore should be false - no assignment on h1
assigned =
- getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
+ getContainerOnHost(jobId, 7, 1024, new String[] {"h1"},
nodeManagers[0], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// RMContainerRequestor would have created a replacement request.
@@ -1516,20 +1552,20 @@ public class TestRMContainerAllocator {
// Known=4, blacklisted=2, ignore should be true. Should assign 0
// container for the same reason above.
assigned =
- getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
+ getContainerOnHost(jobId, 8, 1024, new String[] {"h1"},
nodeManagers[0], allocator, 1, 0, 0, 2, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// Known=4, blacklisted=2, ignore should be true. Should assign 2
// containers.
assigned =
- getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
+ getContainerOnHost(jobId, 8, 1024, new String[] {"h1"},
nodeManagers[0], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
// Known=4, blacklisted=2, ignore should be true.
assigned =
- getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
+ getContainerOnHost(jobId, 9, 1024, new String[] {"h2"},
nodeManagers[1], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
@@ -1540,23 +1576,23 @@ public class TestRMContainerAllocator {
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
// Known=5, blacklisted=3, ignore should be true.
assigned =
- getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
+ getContainerOnHost(jobId, 10, 1024, new String[] {"h3"},
nodeManagers[2], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
-
+
// Assign on 5 more nodes - to re-enable blacklisting
for (int i = 0; i < 5; i++) {
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
assigned =
getContainerOnHost(jobId, 11 + i, 1024,
- new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
+ new String[] {String.valueOf(5 + i)}, nodeManagers[4 + i],
allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
}
// Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
assigned =
- getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
+ getContainerOnHost(jobId, 20, 1024, new String[] {"h3"},
nodeManagers[2], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
}
@@ -1576,7 +1612,8 @@ public class TestRMContainerAllocator {
int expectedAdditions2, int expectedRemovals2, MyResourceManager rm)
throws Exception {
ContainerRequestEvent reqEvent =
- createReq(jobId, taskAttemptId, memory, hosts);
+ ContainerRequestCreator.createRequest(jobId, taskAttemptId,
+ Resource.newInstance(memory, 1), hosts);
allocator.sendRequest(reqEvent);
// Send the request to the RM
@@ -1596,7 +1633,7 @@ public class TestRMContainerAllocator {
expectedAdditions2, expectedRemovals2, rm);
return assigned;
}
-
+
@Test
public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
@@ -1606,7 +1643,7 @@ public class TestRMContainerAllocator {
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
conf.setInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
-
+
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
@@ -1622,7 +1659,7 @@ public class TestRMContainerAllocator {
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
rm.drainEvents();
-
+
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
@@ -1638,8 +1675,10 @@ public class TestRMContainerAllocator {
LOG.info("Requesting 1 Containers _1 on H1");
// create the container request
- ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
- new String[] { "h1" });
+ ContainerRequestEvent event1 =
+ ContainerRequestCreator.createRequest(jobId, 1,
+ Resource.newInstance(1024, 1),
+ new String[] {"h1"});
allocator.sendRequest(event1);
LOG.info("RM Heartbeat (to send the container requests)");
@@ -1653,13 +1692,13 @@ public class TestRMContainerAllocator {
// update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat
rm.drainEvents();
-
+
LOG.info("RM Heartbeat (To process the scheduled containers)");
assigned = allocator.schedule();
rm.drainEvents();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
- Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
-
+ Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
LOG.info("Failing container _1 on H1 (should blacklist the node)");
// Send events to blacklist nodes h1 and h2
ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
@@ -1667,8 +1706,9 @@ public class TestRMContainerAllocator {
//At this stage, a request should be created for a fast fail map
//Create a FAST_FAIL request for a previously failed map.
- ContainerRequestEvent event1f = createReq(jobId, 1, 1024,
- new String[] { "h1" }, true, false);
+ ContainerRequestEvent event1f = createRequest(jobId, 1,
+ Resource.newInstance(1024, 1),
+ new String[] {"h1"}, true, false);
allocator.sendRequest(event1f);
//Update the Scheduler with the new requests.
@@ -1678,24 +1718,26 @@ public class TestRMContainerAllocator {
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// send another request with different resource and priority
- ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
- new String[] { "h1", "h3" });
+ ContainerRequestEvent event3 =
+ ContainerRequestCreator.createRequest(jobId, 3,
+ Resource.newInstance(1024, 1),
+ new String[] {"h1", "h3"});
allocator.sendRequest(event3);
-
+
//Allocator is aware of prio:5 container, and prio:20 (h1+h3) container.
//RM is only aware of the prio:5 container
-
+
LOG.info("h1 Heartbeat (To actually schedule the containers)");
// update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat
rm.drainEvents();
-
+
LOG.info("RM Heartbeat (To process the scheduled containers)");
assigned = allocator.schedule();
rm.drainEvents();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
- Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
//RMContainerAllocator gets assigned a p:5 on a blacklisted node.
//Send a release for the p:5 container + another request.
@@ -1704,26 +1746,26 @@ public class TestRMContainerAllocator {
rm.drainEvents();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-
+
//Hearbeat from H3 to schedule on this host.
LOG.info("h3 Heartbeat (To re-schedule the containers)");
nodeManager3.nodeHeartbeat(true); // Node heartbeat
rm.drainEvents();
-
+
LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
assigned = allocator.schedule();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
rm.drainEvents();
-
+
// For debugging
for (TaskAttemptContainerAssignedEvent assig : assigned) {
LOG.info(assig.getTaskAttemptID() +
" assgined to " + assig.getContainer().getId() +
" with priority " + assig.getContainer().getPriority());
}
-
+
Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
-
+
// validate that all containers are assigned to h3
for (TaskAttemptContainerAssignedEvent assig : assigned) {
Assert.assertEquals("Assigned container " + assig.getContainer().getId()
@@ -1759,13 +1801,13 @@ public class TestRMContainerAllocator {
assert (false);
}
}
-
+
List<ResourceRequest> lastAsk = null;
List<ContainerId> lastRelease = null;
List<String> lastBlacklistAdditions;
List<String> lastBlacklistRemovals;
Resource forceResourceLimit = null;
-
+
// override this to copy the objects otherwise FifoScheduler updates the
// numContainers in same objects as kept by RMContainerAllocator
@Override
@@ -1855,38 +1897,6 @@ public class TestRMContainerAllocator {
}
}
- private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
- int memory, String[] hosts) {
- return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false);
- }
-
- private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
- int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) {
- return createReq(jobId, taskAttemptId, mem,
- 1, hosts, earlierFailedAttempt, reduce);
- }
-
- private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
- int memory, int vcore, String[] hosts, boolean earlierFailedAttempt,
- boolean reduce) {
- TaskId taskId;
- if (reduce) {
- taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
- } else {
- taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
- }
- TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
- taskAttemptId);
- Resource containerNeed = Resource.newInstance(memory, vcore);
- if (earlierFailedAttempt) {
- return ContainerRequestEvent
- .createContainerRequestEventForFailedContainer(attemptId,
- containerNeed);
- }
- return new ContainerRequestEvent(attemptId, containerNeed, hosts,
- new String[] { NetworkTopology.DEFAULT_RACK });
- }
-
private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId,
String host, boolean reduce) {
TaskId taskId;
@@ -1897,9 +1907,9 @@ public class TestRMContainerAllocator {
}
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
taskAttemptId);
- return new ContainerFailedEvent(attemptId, host);
+ return new ContainerFailedEvent(attemptId, host);
}
-
+
private ContainerAllocatorEvent createDeallocateEvent(JobId jobId,
int taskAttemptId, boolean reduce) {
TaskId taskId;
@@ -1957,14 +1967,14 @@ public class TestRMContainerAllocator {
// Mock RMContainerAllocator
// Instead of talking to remote Scheduler,uses the local Scheduler
- private static class MyContainerAllocator extends RMContainerAllocator {
- static final List<TaskAttemptContainerAssignedEvent> events
- = new ArrayList<TaskAttemptContainerAssignedEvent>();
- static final List<TaskAttemptKillEvent> taskAttemptKillEvents
- = new ArrayList<TaskAttemptKillEvent>();
- static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents
- = new ArrayList<JobUpdatedNodesEvent>();
- static final List<JobEvent> jobEvents = new ArrayList<JobEvent>();
+ static class MyContainerAllocator extends RMContainerAllocator {
+ static final List<TaskAttemptContainerAssignedEvent> events =
+ new ArrayList<>();
+ static final List<TaskAttemptKillEvent> taskAttemptKillEvents =
+ new ArrayList<>();
+ static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents =
+ new ArrayList<>();
+ static final List<JobEvent> jobEvents = new ArrayList<>();
private MyResourceManager rm;
private boolean isUnregistered = false;
private AllocateResponse allocateResponse;
@@ -2069,7 +2079,7 @@ public class TestRMContainerAllocator {
}
public void sendRequest(ContainerRequestEvent req) {
- sendRequests(Arrays.asList(new ContainerRequestEvent[] { req }));
+ sendRequests(Arrays.asList(new ContainerRequestEvent[] {req}));
}
public void sendRequests(List<ContainerRequestEvent> reqs) {
@@ -2081,7 +2091,7 @@ public class TestRMContainerAllocator {
public void sendFailure(ContainerFailedEvent f) {
super.handleEvent(f);
}
-
+
public void sendDeallocate(ContainerAllocatorEvent f) {
super.handleEvent(f);
}
@@ -2099,16 +2109,15 @@ public class TestRMContainerAllocator {
// run the scheduler
super.heartbeat();
- List<TaskAttemptContainerAssignedEvent> result
- = new ArrayList<TaskAttemptContainerAssignedEvent>(events);
+ List<TaskAttemptContainerAssignedEvent> result = new ArrayList<>(events);
events.clear();
return result;
}
-
+
static List<TaskAttemptKillEvent> getTaskAttemptKillEvents() {
return taskAttemptKillEvents;
}
-
+
static List<JobUpdatedNodesEvent> getJobUpdatedNodeEvents() {
return jobUpdatedNodeEvents;
}
@@ -2117,12 +2126,12 @@ public class TestRMContainerAllocator {
protected void startAllocatorThread() {
// override to NOT start thread
}
-
+
@Override
protected boolean isApplicationMasterRegistered() {
return super.isApplicationMasterRegistered();
}
-
+
public boolean isUnregistered() {
return isUnregistered;
}
@@ -2164,7 +2173,7 @@ public class TestRMContainerAllocator {
int numPendingReduces = 4;
float maxReduceRampupLimit = 0.5f;
float reduceSlowStart = 0.2f;
-
+
RMContainerAllocator allocator = mock(RMContainerAllocator.class);
doCallRealMethod().when(allocator).scheduleReduces(anyInt(), anyInt(),
anyInt(), anyInt(), anyInt(), anyInt(), any(Resource.class),
@@ -2174,14 +2183,14 @@ public class TestRMContainerAllocator {
// Test slow-start
allocator.scheduleReduces(
- totalMaps, succeededMaps,
- scheduledMaps, scheduledReduces,
- assignedMaps, assignedReduces,
- mapResourceReqt, reduceResourceReqt,
- numPendingReduces,
+ totalMaps, succeededMaps,
+ scheduledMaps, scheduledReduces,
+ assignedMaps, assignedReduces,
+ mapResourceReqt, reduceResourceReqt,
+ numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, never()).setIsReduceStarted(true);
-
+
// verify slow-start still in effect when no more maps need to
// be scheduled but some have yet to complete
allocator.scheduleReduces(
@@ -2197,23 +2206,23 @@ public class TestRMContainerAllocator {
succeededMaps = 3;
doReturn(BuilderUtils.newResource(0, 0)).when(allocator).getResourceLimit();
allocator.scheduleReduces(
- totalMaps, succeededMaps,
- scheduledMaps, scheduledReduces,
- assignedMaps, assignedReduces,
- mapResourceReqt, reduceResourceReqt,
- numPendingReduces,
+ totalMaps, succeededMaps,
+ scheduledMaps, scheduledReduces,
+ assignedMaps, assignedReduces,
+ mapResourceReqt, reduceResourceReqt,
+ numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, times(1)).setIsReduceStarted(true);
-
+
// Test reduce ramp-up
doReturn(BuilderUtils.newResource(100 * 1024, 100 * 1)).when(allocator)
.getResourceLimit();
allocator.scheduleReduces(
- totalMaps, succeededMaps,
- scheduledMaps, scheduledReduces,
- assignedMaps, assignedReduces,
- mapResourceReqt, reduceResourceReqt,
- numPendingReduces,
+ totalMaps, succeededMaps,
+ scheduledMaps, scheduledReduces,
+ assignedMaps, assignedReduces,
+ mapResourceReqt, reduceResourceReqt,
+ numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampUpReduces(anyInt());
verify(allocator, never()).rampDownReduces(anyInt());
@@ -2232,18 +2241,18 @@ public class TestRMContainerAllocator {
verify(allocator).rampDownReduces(anyInt());
// Test reduce ramp-down for when there are scheduled maps
- // Since we have two scheduled Maps, rampDownReducers
+ // Since we have two scheduled Maps, rampDownReducers
// should be invoked twice.
scheduledMaps = 2;
assignedReduces = 2;
doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator)
.getResourceLimit();
allocator.scheduleReduces(
- totalMaps, succeededMaps,
- scheduledMaps, scheduledReduces,
- assignedMaps, assignedReduces,
- mapResourceReqt, reduceResourceReqt,
- numPendingReduces,
+ totalMaps, succeededMaps,
+ scheduledMaps, scheduledReduces,
+ assignedMaps, assignedReduces,
+ mapResourceReqt, reduceResourceReqt,
+ numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, times(2)).rampDownReduces(anyInt());
@@ -2288,7 +2297,7 @@ public class TestRMContainerAllocator {
recalculatedReduceSchedule = true;
}
}
-
+
@Test
public void testCompletedTasksRecalculateSchedule() throws Exception {
LOG.info("Running testCompletedTasksRecalculateSchedule");
@@ -2400,31 +2409,33 @@ public class TestRMContainerAllocator {
RMContainerAllocator allocator = new RMContainerAllocator(
mock(ClientService.class), mock(AppContext.class),
new NoopAMPreemptionPolicy());
-
+
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
MRBuilderUtils.newTaskId(
MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
ApplicationId applicationId = ApplicationId.newInstance(1, 1);
- ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(
- applicationId, 1);
- ContainerId containerId = ContainerId.newContainerId(applicationAttemptId, 1);
+ ApplicationAttemptId applicationAttemptId =
+ ApplicationAttemptId.newInstance(applicationId, 1);
+ ContainerId containerId =
+ ContainerId.newContainerId(applicationAttemptId, 1);
ContainerStatus status = ContainerStatus.newInstance(
containerId, ContainerState.RUNNING, "", 0);
ContainerStatus abortedStatus = ContainerStatus.newInstance(
containerId, ContainerState.RUNNING, "",
ContainerExitStatus.ABORTED);
-
+
TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
event.getType());
-
+
TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
abortedStatus, attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
-
- ContainerId containerId2 = ContainerId.newContainerId(applicationAttemptId, 2);
+
+ ContainerId containerId2 =
+ ContainerId.newContainerId(applicationAttemptId, 2);
ContainerStatus status2 = ContainerStatus.newInstance(containerId2,
ContainerState.RUNNING, "", 0);
@@ -2440,7 +2451,7 @@ public class TestRMContainerAllocator {
preemptedStatus, attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType());
}
-
+
@Test
public void testUnregistrationOnlyIfRegistered() throws Exception {
Configuration conf = new Configuration();
@@ -2483,7 +2494,7 @@ public class TestRMContainerAllocator {
mrApp.stop();
Assert.assertTrue(allocator.isUnregistered());
}
-
+
// Step-1 : AM send allocate request for 2 ContainerRequests and 1
// blackListeNode
// Step-2 : 2 containers are allocated by RM.
@@ -2542,11 +2553,15 @@ public class TestRMContainerAllocator {
// create the container request
// send MAP request
ContainerRequestEvent event1 =
- createReq(jobId, 1, 1024, new String[] { "h1" });
+ ContainerRequestCreator.createRequest(jobId, 1,
+ Resource.newInstance(1024, 1),
+ new String[]{"h1"});
allocator.sendRequest(event1);
ContainerRequestEvent event2 =
- createReq(jobId, 2, 2048, new String[] { "h1", "h2" });
+ ContainerRequestCreator.createRequest(jobId, 2,
+ Resource.newInstance(2048, 1),
+ new String[] {"h1", "h2"});
allocator.sendRequest(event2);
// Send events to blacklist h2
@@ -2584,7 +2599,9 @@ public class TestRMContainerAllocator {
// RM
// send container request
ContainerRequestEvent event3 =
- createReq(jobId, 3, 1000, new String[] { "h1" });
+ ContainerRequestCreator.createRequest(jobId, 3,
+ Resource.newInstance(1000, 1),
+ new String[]{"h1"});
allocator.sendRequest(event3);
// send deallocate request
@@ -2628,7 +2645,9 @@ public class TestRMContainerAllocator {
allocator.sendFailure(f2);
ContainerRequestEvent event4 =
- createReq(jobId, 4, 2000, new String[] { "h1", "h2" });
+ ContainerRequestCreator.createRequest(jobId, 4,
+ Resource.newInstance(2000, 1),
+ new String[]{"h1", "h2"});
allocator.sendRequest(event4);
// send allocate request to 2nd RM and get resync command
@@ -2639,7 +2658,9 @@ public class TestRMContainerAllocator {
// asks,release,blacklistAaddition
// and another containerRequest(event5)
ContainerRequestEvent event5 =
- createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" });
+ ContainerRequestCreator.createRequest(jobId, 5,
+ Resource.newInstance(3000, 1),
+ new String[]{"h1", "h2", "h3"});
allocator.sendRequest(event5);
// send all outstanding request again.
@@ -2696,9 +2717,10 @@ public class TestRMContainerAllocator {
}
};
- ContainerRequestEvent mapRequestEvt = createReq(jobId, 0,
- (int) (maxContainerSupported.getMemorySize() + 10),
- maxContainerSupported.getVirtualCores(),
+ final int memory = (int) (maxContainerSupported.getMemorySize() + 10);
+ ContainerRequestEvent mapRequestEvt = createRequest(jobId, 0,
+ Resource.newInstance(memory,
+ maxContainerSupported.getVirtualCores()),
new String[0], false, false);
allocator.sendRequests(Arrays.asList(mapRequestEvt));
allocator.schedule();
@@ -2734,10 +2756,11 @@ public class TestRMContainerAllocator {
}
};
- ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0,
- (int) (maxContainerSupported.getMemorySize() + 10),
- maxContainerSupported.getVirtualCores(),
- new String[0], false, true);
+ final int memory = (int) (maxContainerSupported.getMemorySize() + 10);
+ ContainerRequestEvent reduceRequestEvt = createRequest(jobId, 0,
+ Resource.newInstance(memory,
+ maxContainerSupported.getVirtualCores()),
+ new String[0], false, true);
allocator.sendRequests(Arrays.asList(reduceRequestEvt));
// Reducer container requests are added to the pending queue upon request,
// schedule all reducers here so that we can observe if reducer requests
@@ -2787,8 +2810,9 @@ public class TestRMContainerAllocator {
rm1.drainEvents();
Assert.assertEquals("Should Have 1 Job Event", 1,
allocator.jobEvents.size());
- JobEvent event = allocator.jobEvents.get(0);
- Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT));
+ JobEvent event = allocator.jobEvents.get(0);
+ Assert.assertTrue("Should Reboot",
+ event.getType().equals(JobEventType.JOB_AM_REBOOT));
}
@Test(timeout=60000)
@@ -2920,7 +2944,9 @@ public class TestRMContainerAllocator {
// create some map requests
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
for (int i = 0; i < reqMapEvents.length; ++i) {
- reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
+ reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i,
+ Resource.newInstance(1024, 1),
+ new String[] {"h" + i});
}
allocator.sendRequests(Arrays.asList(reqMapEvents));
// create some reduce requests
@@ -2928,7 +2954,8 @@ public class TestRMContainerAllocator {
new ContainerRequestEvent[REDUCE_COUNT];
for (int i = 0; i < reqReduceEvents.length; ++i) {
reqReduceEvents[i] =
- createReq(jobId, i, 1024, new String[] {}, false, true);
+ createRequest(jobId, i, Resource.newInstance(1024, 1),
+ new String[] {}, false, true);
}
allocator.sendRequests(Arrays.asList(reqReduceEvents));
allocator.schedule();
@@ -2975,14 +3002,17 @@ public class TestRMContainerAllocator {
// create some map requests
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
for (int i = 0; i < reqMapEvents.length; ++i) {
- reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
+ reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i,
+ Resource.newInstance(1024, 1), new String[] {"h" + i});
}
allocator.sendRequests(Arrays.asList(reqMapEvents));
// create some reduce requests
- ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT];
+ ContainerRequestEvent[] reqReduceEvents =
+ new ContainerRequestEvent[REDUCE_COUNT];
for (int i = 0; i < reqReduceEvents.length; ++i) {
- reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
- false, true);
+ reqReduceEvents[i] =
+ createRequest(jobId, i, Resource.newInstance(1024, 1),
+ new String[] {}, false, true);
}
allocator.sendRequests(Arrays.asList(reqReduceEvents));
allocator.schedule();
@@ -3137,13 +3167,19 @@ public class TestRMContainerAllocator {
// Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 =
- createReq(jobId, 1, 1024, new String[] { "h1" });
+ ContainerRequestCreator.createRequest(jobId, 1,
+ Resource.newInstance(1024, 1),
+ new String[]{"h1"});
allocator.sendRequest(event1);
ContainerRequestEvent event2 =
- createReq(jobId, 2, 1024, new String[] { "h2" });
+ ContainerRequestCreator.createRequest(jobId, 2,
+ Resource.newInstance(1024, 1),
+ new String[]{"h2"});
allocator.sendRequest(event2);
ContainerRequestEvent event3 =
- createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
+ createRequest(jobId, 3,
+ Resource.newInstance(1024, 1),
+ new String[]{"h2"}, false, true);
allocator.sendRequest(event3);
// This will tell the scheduler about the requests but there will be no
@@ -3156,7 +3192,8 @@ public class TestRMContainerAllocator {
// Request for another reducer on h3 which has not registered.
ContainerRequestEvent event4 =
- createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+ createRequest(jobId, 4, Resource.newInstance(1024, 1),
+ new String[] {"h3"}, false, true);
allocator.sendRequest(event4);
allocator.schedule();
@@ -3301,13 +3338,18 @@ public class TestRMContainerAllocator {
// Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 =
- createReq(jobId, 1, 1024, new String[] { "h1" });
+ ContainerRequestCreator.createRequest(jobId, 1,
+ Resource.newInstance(1024, 1),
+ new String[]{"h1"});
allocator.sendRequest(event1);
ContainerRequestEvent event2 =
- createReq(jobId, 2, 1024, new String[] { "h2" });
+ ContainerRequestCreator.createRequest(jobId, 2,
+ Resource.newInstance(1024, 1),
+ new String[]{"h2"});
allocator.sendRequest(event2);
ContainerRequestEvent event3 =
- createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
+ createRequest(jobId, 3, Resource.newInstance(1024, 1),
+ new String[]{"h2"}, false, true);
allocator.sendRequest(event3);
// This will tell the scheduler about the requests but there will be no
@@ -3320,7 +3362,8 @@ public class TestRMContainerAllocator {
// Request for another reducer on h3 which has not registered.
ContainerRequestEvent event4 =
- createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+ createRequest(jobId, 4, Resource.newInstance(1024, 1),
+ new String[]{"h3"}, false, true);
allocator.sendRequest(event4);
allocator.schedule();
@@ -3433,13 +3476,19 @@ public class TestRMContainerAllocator {
// Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 =
- createReq(jobId, 1, 1024, new String[] { "h1" });
+ ContainerRequestCreator.createRequest(jobId, 1,
+ Resource.newInstance(1024, 1),
+ new String[]{"h1"});
allocator.sendRequest(event1);
ContainerRequestEvent event2 =
- createReq(jobId, 2, 1024, new String[] { "h2" });
+ ContainerRequestCreator.createRequest(jobId, 2,
+ Resource.newInstance(1024, 1),
+ new String[]{"h2"});
allocator.sendRequest(event2);
ContainerRequestEvent event3 =
- createReq(jobId, 3, 1024, new String[] { "h1" }, false, true);
+ createRequest(jobId, 3,
+ Resource.newInstance(1024, 1),
+ new String[]{"h1"}, false, true);
allocator.sendRequest(event3);
// This will tell the scheduler about the requests but there will be no
@@ -3449,7 +3498,8 @@ public class TestRMContainerAllocator {
// Request for another reducer on h3 which has not registered.
ContainerRequestEvent event4 =
- createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+ createRequest(jobId, 4, Resource.newInstance(1024, 1),
+ new String[] {"h3"}, false, true);
allocator.sendRequest(event4);
allocator.schedule();
@@ -3486,7 +3536,9 @@ public class TestRMContainerAllocator {
// Send request for one more mapper.
ContainerRequestEvent event5 =
- createReq(jobId, 5, 1024, new String[] { "h1" });
+ ContainerRequestCreator.createRequest(jobId, 5,
+ Resource.newInstance(1024, 1),
+ new String[]{"h1"});
allocator.sendRequest(event5);
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2));
@@ -3528,7 +3580,7 @@ public class TestRMContainerAllocator {
return RegisterApplicationMasterResponse.newInstance(
Resource.newInstance(512, 1),
Resource.newInstance(512000, 1024),
- Collections.<ApplicationAccessType,String>emptyMap(),
+ Collections.emptyMap(),
ByteBuffer.wrap("fake_key".getBytes()),
Collections.<Container>emptyList(),
"default",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
index 7a212e1..1da2fed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
@@ -175,16 +175,8 @@ public class UnitsConversionUtil {
*/
public static int compare(String unitA, long valueA, String unitB,
long valueB) {
- if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA)
- || !KNOWN_UNITS.contains(unitB)) {
- throw new IllegalArgumentException("Units cannot be null");
- }
- if (!KNOWN_UNITS.contains(unitA)) {
- throw new IllegalArgumentException("Unknown unit '" + unitA + "'");
- }
- if (!KNOWN_UNITS.contains(unitB)) {
- throw new IllegalArgumentException("Unknown unit '" + unitB + "'");
- }
+ checkUnitArgument(unitA);
+ checkUnitArgument(unitB);
if (unitA.equals(unitB)) {
return Long.compare(valueA, valueB);
}
@@ -218,4 +210,36 @@ public class UnitsConversionUtil {
return tmpA.compareTo(tmpB);
}
}
+
+ private static void checkUnitArgument(String unit) {
+ if (unit == null) {
+ throw new IllegalArgumentException("Unit cannot be null");
+ } else if (!KNOWN_UNITS.contains(unit)) {
+ throw new IllegalArgumentException("Unknown unit '" + unit + "'");
+ }
+ }
+
+ /**
+ * Compare a unit to another unit.
+ * <br>
+ * Examples:<br>
+ * 1. 'm' (milli) is smaller than 'k' (kilo), so compareUnits("m", "k")
+ * will return -1.<br>
+ * 2. 'M' (MEGA) is greater than 'k' (kilo), so compareUnits("M", "k") will
+ * return 1.
+ *
+ * @param unitA first unit
+ * @param unitB second unit
+ * @return +1, 0 or -1 depending on whether the relationship between units
+ * is smaller than,
+ * equal to or lesser than.
+ */
+ public static int compareUnits(String unitA, String unitB) {
+ checkUnitArgument(unitA);
+ checkUnitArgument(unitB);
+ int unitAPos = SORTED_UNITS.indexOf(unitA);
+ int unitBPos = SORTED_UNITS.indexOf(unitB);
+
+ return Integer.compare(unitAPos, unitBPos);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
new file mode 100644
index 0000000..98a8a00
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.resourcetypes;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Contains helper methods to create Resource and ResourceInformation objects.
+ * ResourceInformation can be created from a resource name
+ * and a resource descriptor as well that comprises amount and unit.
+ */
+public final class ResourceTypesTestHelper {
+
+ private static final Pattern RESOURCE_VALUE_AND_UNIT_PATTERN =
+ Pattern.compile("(\\d+)([A-za-z]*)");
+
+ private ResourceTypesTestHelper() {}
+
+ private static final RecordFactory RECORD_FACTORY = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ private static final class ResourceValueAndUnit {
+ private final Long value;
+ private final String unit;
+
+ private ResourceValueAndUnit(Long value, String unit) {
+ this.value = value;
+ this.unit = unit;
+ }
+ }
+
+ public static Resource newResource(long memory, int vCores, Map<String,
+ String> customResources) {
+ Resource resource = RECORD_FACTORY.newRecordInstance(Resource.class);
+ resource.setMemorySize(memory);
+ resource.setVirtualCores(vCores);
+
+ for (Map.Entry<String, String> customResource :
+ customResources.entrySet()) {
+ String resourceName = customResource.getKey();
+ ResourceInformation resourceInformation =
+ createResourceInformation(resourceName,
+ customResource.getValue());
+ resource.setResourceInformation(resourceName, resourceInformation);
+ }
+ return resource;
+ }
+
+ public static ResourceInformation createResourceInformation(String
+ resourceName, String descriptor) {
+ ResourceValueAndUnit resourceValueAndUnit =
+ getResourceValueAndUnit(descriptor);
+ return ResourceInformation
+ .newInstance(resourceName, resourceValueAndUnit.unit,
+ resourceValueAndUnit.value);
+ }
+
+ private static ResourceValueAndUnit getResourceValueAndUnit(String val) {
+ Matcher matcher = RESOURCE_VALUE_AND_UNIT_PATTERN.matcher(val);
+ if (!matcher.find()) {
+ throw new RuntimeException("Invalid pattern of resource descriptor: " +
+ val);
+ } else if (matcher.groupCount() != 2) {
+ throw new RuntimeException("Capturing group count in string " +
+ val + " is not 2!");
+ }
+ long value = Long.parseLong(matcher.group(1));
+
+ return new ResourceValueAndUnit(value, matcher.group(2));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 0de834c..e06b55e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -183,7 +183,7 @@ public class BuilderUtils {
public static NodeId newNodeId(String host, int port) {
return NodeId.newInstance(host, port);
}
-
+
public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime) {
@@ -422,7 +422,7 @@ public class BuilderUtils {
report.setPriority(priority);
return report;
}
-
+
public static ApplicationSubmissionContext newApplicationSubmissionContext(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
@@ -477,6 +477,10 @@ public class BuilderUtils {
return resource;
}
+ public static Resource newEmptyResource() {
+ return recordFactory.newRecordInstance(Resource.class);
+ }
+
public static URL newURL(String scheme, String host, int port, String file) {
URL url = recordFactory.newRecordInstance(URL.class);
url.setScheme(scheme);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index c0d7d86..9b3c20a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -283,24 +284,10 @@ public class SchedulerUtils {
private static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
throws InvalidResourceRequestException {
- Resource requestedResource = resReq.getCapability();
- for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
- ResourceInformation reqRI = requestedResource.getResourceInformation(i);
- ResourceInformation maxRI = maximumResource.getResourceInformation(i);
- if (reqRI.getValue() < 0 || reqRI.getValue() > maxRI.getValue()) {
- throw new InvalidResourceRequestException(
- "Invalid resource request, requested resource type=[" + reqRI
- .getName()
- + "] < 0 or greater than maximum allowed allocation. Requested "
- + "resource=" + requestedResource
- + ", maximum allowed allocation=" + maximumResource
- + ", please note that maximum allowed allocation is calculated "
- + "by scheduler based on maximum resource of registered "
- + "NodeManagers, which might be less than configured "
- + "maximum allocation=" + ResourceUtils
- .getResourceTypesMaximumAllocation());
- }
- }
+ final Resource requestedResource = resReq.getCapability();
+ checkResourceRequestAgainstAvailableResource(requestedResource,
+ maximumResource);
+
String labelExp = resReq.getNodeLabelExpression();
// we don't allow specify label expression other than resourceName=ANY now
if (!ResourceRequest.ANY.equals(resReq.getResourceName())
@@ -338,6 +325,78 @@ public class SchedulerUtils {
}
}
+ @Private
+ @VisibleForTesting
+ static void checkResourceRequestAgainstAvailableResource(Resource reqResource,
+ Resource availableResource) throws InvalidResourceRequestException {
+ for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
+ final ResourceInformation requestedRI =
+ reqResource.getResourceInformation(i);
+ final String reqResourceName = requestedRI.getName();
+
+ if (requestedRI.getValue() < 0) {
+ throwInvalidResourceException(reqResource, availableResource,
+ reqResourceName);
+ }
+
+ final ResourceInformation availableRI =
+ availableResource.getResourceInformation(reqResourceName);
+
+ long requestedResourceValue = requestedRI.getValue();
+ long availableResourceValue = availableRI.getValue();
+ int unitsRelation = UnitsConversionUtil
+ .compareUnits(requestedRI.getUnits(), availableRI.getUnits());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Requested resource information: " + requestedRI);
+ LOG.debug("Available resource information: " + availableRI);
+ LOG.debug("Relation of units: " + unitsRelation);
+ }
+
+ // requested resource unit is less than available resource unit
+ // e.g. requestedUnit: "m", availableUnit: "K")
+ if (unitsRelation < 0) {
+ availableResourceValue =
+ UnitsConversionUtil.convert(availableRI.getUnits(),
+ requestedRI.getUnits(), availableRI.getValue());
+
+ // requested resource unit is greater than available resource unit
+ // e.g. requestedUnit: "G", availableUnit: "M")
+ } else if (unitsRelation > 0) {
+ requestedResourceValue =
+ UnitsConversionUtil.convert(requestedRI.getUnits(),
+ availableRI.getUnits(), requestedRI.getValue());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Requested resource value after conversion: " +
+ requestedResourceValue);
+ LOG.info("Available resource value after conversion: " +
+ availableResourceValue);
+ }
+
+ if (requestedResourceValue > availableResourceValue) {
+ throwInvalidResourceException(reqResource, availableResource,
+ reqResourceName);
+ }
+ }
+ }
+
+ private static void throwInvalidResourceException(Resource reqResource,
+ Resource availableResource, String reqResourceName)
+ throws InvalidResourceRequestException {
+ throw new InvalidResourceRequestException(
+ "Invalid resource request, requested resource type=[" + reqResourceName
+ + "] < 0 or greater than maximum allowed allocation. Requested "
+ + "resource=" + reqResource + ", maximum allowed allocation="
+ + availableResource
+ + ", please note that maximum allowed allocation is calculated "
+ + "by scheduler based on maximum resource of registered "
+ + "NodeManagers, which might be less than configured "
+ + "maximum allocation="
+ + ResourceUtils.getResourceTypesMaximumAllocation());
+ }
+
private static void checkQueueLabelInLabelManager(String labelExpression,
RMContext rmContext) throws InvalidLabelResourceRequestException {
// check node label manager contains this label
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org