You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/01/23 09:56:07 UTC
[37/50] [abbrv] hadoop git commit: YARN-6619. AMRMClient Changes to
use the PlacementConstraint and SchcedulingRequest objects. (Arun Suresh via
wangda)
YARN-6619. AMRMClient Changes to use the PlacementConstraint and SchcedulingRequest objects. (Arun Suresh via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fef39e7f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fef39e7f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fef39e7f
Branch: refs/heads/YARN-6592
Commit: fef39e7f42bf1e1f0b1464bb5e9a5a3c73759cb0
Parents: 2f2c714
Author: Wangda Tan <wa...@apache.org>
Authored: Wed Jan 17 11:36:26 2018 -0800
Committer: Sunil G <su...@apache.org>
Committed: Tue Jan 23 15:20:23 2018 +0530
----------------------------------------------------------------------
.../hadoop/yarn/client/api/AMRMClient.java | 38 +++-
.../yarn/client/api/async/AMRMClientAsync.java | 48 +++++
.../api/async/impl/AMRMClientAsyncImpl.java | 49 ++++-
.../yarn/client/api/impl/AMRMClientImpl.java | 142 ++++++++++++-
.../client/api/impl/BaseAMRMClientTest.java | 212 +++++++++++++++++++
.../yarn/client/api/impl/TestAMRMClient.java | 156 +-------------
.../TestAMRMClientPlacementConstraints.java | 204 ++++++++++++++++++
.../rmcontainer/RMContainerImpl.java | 3 +
.../scheduler/AbstractYarnScheduler.java | 1 +
.../scheduler/SchedulerApplicationAttempt.java | 1 +
.../constraint/PlacementConstraintsUtil.java | 4 +-
11 files changed, 700 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fef39e7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index d3d1974..914a146 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
import java.util.function.Supplier;
import java.util.List;
@@ -39,7 +41,9 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -554,6 +558,18 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
}
/**
+ * Add a Collection of SchedulingRequests. The AMRMClient will ensure that
+ * all requests in the same batch are sent in the same allocate call.
+ * @param schedulingRequests Collection of Scheduling Requests.
+ */
+ @Public
+ @InterfaceStability.Unstable
+ public void addSchedulingRequests(
+ Collection<SchedulingRequest> schedulingRequests) {
+
+ }
+
+ /**
* Register the application master. This must be called before any
* other interaction
* @param appHostName Name of the host on which master is running
@@ -568,7 +584,27 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
int appHostPort,
String appTrackingUrl)
throws YarnException, IOException;
-
+
+ /**
+ * Register the application master. This must be called before any
+ * other interaction
+ * @param appHostName Name of the host on which master is running
+ * @param appHostPort Port master is listening on
+ * @param appTrackingUrl URL at which the master info can be seen
+ * @param placementConstraints Placement Constraints mappings.
+ * @return <code>RegisterApplicationMasterResponse</code>
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Public
+ @InterfaceStability.Unstable
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ String appHostName, int appHostPort, String appTrackingUrl,
+ Map<Set<String>, PlacementConstraint> placementConstraints)
+ throws YarnException, IOException {
+ throw new YarnException("Not supported");
+ }
+
/**
* Request additional containers and receive new container allocations.
* Requests made via <code>addContainerRequest</code> are sent to the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fef39e7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 2b82ad6..0af687b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.client.api.async;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -38,9 +40,12 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@@ -206,6 +211,19 @@ extends AbstractService {
Resource capability);
/**
+ * Add a Collection of SchedulingRequests. The AMRMClient will ensure that
+ * all requests in the same batch are sent in the same allocate call.
+ * @param schedulingRequests Collection of Scheduling Requests.
+ */
+ @Public
+ @Unstable
+ public void addSchedulingRequests(
+ Collection<SchedulingRequest> schedulingRequests) {
+
+ }
+
+
+ /**
* Returns all matching ContainerRequests that match the given Priority,
* ResourceName, ExecutionType and Capability.
*
@@ -250,6 +268,26 @@ extends AbstractService {
throws YarnException, IOException;
/**
+ * Register the application master. This must be called before any
+ * other interaction
+ * @param appHostName Name of the host on which master is running
+ * @param appHostPort Port master is listening on
+ * @param appTrackingUrl URL at which the master info can be seen
+ * @param placementConstraints Placement Constraints mappings.
+ * @return <code>RegisterApplicationMasterResponse</code>
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Public
+ @Unstable
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ String appHostName, int appHostPort, String appTrackingUrl,
+ Map<Set<String>, PlacementConstraint> placementConstraints)
+ throws YarnException, IOException {
+ throw new YarnException("Not supported");
+ }
+
+ /**
* Unregister the application master. This must be called in the end.
* @param appStatus Success/Failure status of the master
* @param appMessage Diagnostics message on failure
@@ -494,6 +532,16 @@ extends AbstractService {
public void onContainersReceivedFromPreviousAttempts(
List<Container> containers) {
}
+
+ /**
+ * Called when the RM has rejected Scheduling Requests.
+ * @param rejectedSchedulingRequests Rejected Scheduling Requests.
+ */
+ @Public
+ @Unstable
+ public void onRequestsRejected(
+ List<RejectedSchedulingRequest> rejectedSchedulingRequests) {
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fef39e7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 33b0aba..4f04b66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -36,9 +38,12 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@@ -150,18 +155,50 @@ extends AMRMClientAsync<T> {
Resource capability) {
return client.getMatchingRequests(priority, resourceName, capability);
}
-
+
+ @Override
+ public void addSchedulingRequests(
+ Collection<SchedulingRequest> schedulingRequests) {
+ client.addSchedulingRequests(schedulingRequests);
+ }
+
/**
* Registers this application master with the resource manager. On successful
* registration, starts the heartbeating thread.
+ *
+ * @param appHostName Name of the host on which master is running
+ * @param appHostPort Port master is listening on
+ * @param appTrackingUrl URL at which the master info can be seen
+ * @return Register AM Response.
* @throws YarnException
* @throws IOException
*/
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl)
throws YarnException, IOException {
+ return registerApplicationMaster(
+ appHostName, appHostPort, appTrackingUrl, null);
+ }
+
+ /**
+ * Registers this application master with the resource manager. On successful
+ * registration, starts the heartbeating thread.
+ *
+ * @param appHostName Name of the host on which master is running
+ * @param appHostPort Port master is listening on
+ * @param appTrackingUrl URL at which the master info can be seen
+ * @param placementConstraintsMap Placement Constraints Mapping.
+ * @return Register AM Response.
+ * @throws YarnException
+ * @throws IOException
+ */
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ String appHostName, int appHostPort, String appTrackingUrl,
+ Map<Set<String>, PlacementConstraint> placementConstraintsMap)
+ throws YarnException, IOException {
RegisterApplicationMasterResponse response = client
- .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
+ .registerApplicationMaster(appHostName, appHostPort,
+ appTrackingUrl, placementConstraintsMap);
heartbeatThread.start();
return response;
}
@@ -366,6 +403,14 @@ extends AMRMClientAsync<T> {
response.getContainersFromPreviousAttempts());
}
}
+ List<RejectedSchedulingRequest> rejectedSchedulingRequests =
+ response.getRejectedSchedulingRequests();
+ if (!rejectedSchedulingRequests.isEmpty()) {
+ if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
+ ((AMRMClientAsync.AbstractCallbackHandler) handler)
+ .onRequestsRejected(rejectedSchedulingRequests);
+ }
+ }
progress = handler.getProgress();
} catch (Throwable ex) {
handler.onError(ex);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fef39e7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 5507c07..8e2336f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -30,9 +30,11 @@ import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.AbstractMap.SimpleEntry;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -60,9 +62,11 @@ import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -106,6 +110,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected final Set<String> blacklistedNodes = new HashSet<String>();
protected final Set<String> blacklistAdditions = new HashSet<String>();
protected final Set<String> blacklistRemovals = new HashSet<String>();
+ private Map<Set<String>, PlacementConstraint> placementConstraints =
+ new HashMap<>();
+ private Queue<Collection<SchedulingRequest>> batchedSchedulingRequests =
+ new LinkedList<>();
+ private Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests =
+ new ConcurrentHashMap<>();
protected Map<String, Resource> resourceProfilesMap;
@@ -218,14 +228,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
super.serviceStop();
}
-
+
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl)
throws YarnException, IOException {
+ return registerApplicationMaster(appHostName, appHostPort, appTrackingUrl,
+ null);
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ String appHostName, int appHostPort, String appTrackingUrl,
+ Map<Set<String>, PlacementConstraint> placementConstraintsMap)
+ throws YarnException, IOException {
this.appHostName = appHostName;
this.appHostPort = appHostPort;
this.appTrackingUrl = appTrackingUrl;
+ if (placementConstraintsMap != null && !placementConstraintsMap.isEmpty()) {
+ this.placementConstraints.putAll(placementConstraintsMap);
+ }
Preconditions.checkArgument(appHostName != null,
"The host name should not be null");
Preconditions.checkArgument(appHostPort >= -1, "Port number of the host"
@@ -240,6 +262,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
RegisterApplicationMasterRequest request =
RegisterApplicationMasterRequest.newInstance(this.appHostName,
this.appHostPort, this.appTrackingUrl);
+ if (!this.placementConstraints.isEmpty()) {
+ request.setPlacementConstraints(this.placementConstraints);
+ }
RegisterApplicationMasterResponse response =
rmClient.registerApplicationMaster(request);
synchronized (this) {
@@ -248,11 +273,23 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
populateNMTokens(response.getNMTokensFromPreviousAttempts());
}
this.resourceProfilesMap = response.getResourceProfiles();
+ List<Container> prevContainers =
+ response.getContainersFromPreviousAttempts();
+ removeFromOutstandingSchedulingRequests(prevContainers);
+ recreateSchedulingRequestBatch();
}
return response;
}
@Override
+ public void addSchedulingRequests(
+ Collection<SchedulingRequest> schedulingRequests) {
+ synchronized (this.batchedSchedulingRequests) {
+ this.batchedSchedulingRequests.add(schedulingRequests);
+ }
+ }
+
+ @Override
public AllocateResponse allocate(float progressIndicator)
throws YarnException, IOException {
Preconditions.checkArgument(progressIndicator >= 0,
@@ -288,6 +325,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
.responseId(lastResponseId).progress(progressIndicator)
.askList(askList).resourceBlacklistRequest(blacklistRequest)
.releaseList(releaseList).updateRequests(updateList).build();
+ populateSchedulingRequests(allocateRequest);
// clear blacklistAdditions and blacklistRemovals before
// unsynchronized part
blacklistAdditions.clear();
@@ -296,6 +334,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
try {
allocateResponse = rmClient.allocate(allocateRequest);
+ removeFromOutstandingSchedulingRequests(
+ allocateResponse.getAllocatedContainers());
+ removeFromOutstandingSchedulingRequests(
+ allocateResponse.getContainersFromPreviousAttempts());
} catch (ApplicationMasterNotRegisteredException e) {
LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resyncing.");
@@ -397,6 +439,104 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
return allocateResponse;
}
+ private void populateSchedulingRequests(AllocateRequest allocateRequest) {
+ synchronized (this.batchedSchedulingRequests) {
+ if (!this.batchedSchedulingRequests.isEmpty()) {
+ List<SchedulingRequest> newReqs = new LinkedList<>();
+ Iterator<Collection<SchedulingRequest>> iter =
+ this.batchedSchedulingRequests.iterator();
+ while (iter.hasNext()) {
+ Collection<SchedulingRequest> requests = iter.next();
+ newReqs.addAll(requests);
+ addToOutstandingSchedulingRequests(requests);
+ iter.remove();
+ }
+ allocateRequest.setSchedulingRequests(newReqs);
+ }
+ }
+ }
+
+ private void recreateSchedulingRequestBatch() {
+ List<SchedulingRequest> batched = new ArrayList<>();
+ synchronized (this.outstandingSchedRequests) {
+ for (List<SchedulingRequest> schedReqs :
+ this.outstandingSchedRequests.values()) {
+ batched.addAll(schedReqs);
+ }
+ }
+ synchronized (this.batchedSchedulingRequests) {
+ this.batchedSchedulingRequests.add(batched);
+ }
+ }
+
+ private void addToOutstandingSchedulingRequests(
+ Collection<SchedulingRequest> requests) {
+ for (SchedulingRequest req : requests) {
+ List<SchedulingRequest> schedulingRequests =
+ this.outstandingSchedRequests.computeIfAbsent(
+ req.getAllocationTags(), x -> new LinkedList<>());
+ SchedulingRequest matchingReq = null;
+ synchronized (schedulingRequests) {
+ for (SchedulingRequest schedReq : schedulingRequests) {
+ if (isMatching(req, schedReq)) {
+ matchingReq = schedReq;
+ break;
+ }
+ }
+ if (matchingReq != null) {
+ matchingReq.getResourceSizing().setNumAllocations(
+ req.getResourceSizing().getNumAllocations());
+ } else {
+ schedulingRequests.add(req);
+ }
+ }
+ }
+ }
+
+ private boolean isMatching(SchedulingRequest schedReq1,
+ SchedulingRequest schedReq2) {
+ return schedReq1.getPriority().equals(schedReq2.getPriority()) &&
+ schedReq1.getExecutionType().getExecutionType().equals(
+ schedReq1.getExecutionType().getExecutionType()) &&
+ schedReq1.getAllocationRequestId() ==
+ schedReq2.getAllocationRequestId();
+ }
+
+ private void removeFromOutstandingSchedulingRequests(
+ Collection<Container> containers) {
+ if (containers == null || containers.isEmpty()) {
+ return;
+ }
+ for (Container container : containers) {
+ if (container.getAllocationTags() != null &&
+ !container.getAllocationTags().isEmpty()) {
+ List<SchedulingRequest> schedReqs =
+ this.outstandingSchedRequests.get(container.getAllocationTags());
+ if (schedReqs != null && !schedReqs.isEmpty()) {
+ synchronized (schedReqs) {
+ Iterator<SchedulingRequest> iter = schedReqs.iterator();
+ while (iter.hasNext()) {
+ SchedulingRequest schedReq = iter.next();
+ if (schedReq.getPriority().equals(container.getPriority()) &&
+ schedReq.getAllocationRequestId() ==
+ container.getAllocationRequestId()) {
+ int numAllocations =
+ schedReq.getResourceSizing().getNumAllocations();
+ numAllocations--;
+ if (numAllocations == 0) {
+ iter.remove();
+ } else {
+ schedReq.getResourceSizing()
+ .setNumAllocations(numAllocations);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
private List<UpdateContainerRequest> createUpdateList() {
List<UpdateContainerRequest> updateList = new ArrayList<>();
for (Map.Entry<ContainerId, SimpleEntry<Container,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fef39e7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java
new file mode 100644
index 0000000..d18652f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for testing AMRMClient.
+ */
+public class BaseAMRMClientTest {
+
+ protected Configuration conf = null;
+ protected MiniYARNCluster yarnCluster = null;
+ protected YarnClient yarnClient = null;
+ protected List<NodeReport> nodeReports = null;
+ protected ApplicationAttemptId attemptId = null;
+
+ protected String schedulerName = CapacityScheduler.class.getName();
+ protected boolean autoUpdate = false;
+
+ protected int nodeCount = 3;
+ protected long amExpireMs = 4000;
+ protected int rollingIntervalSec = 13;
+
+
+ protected Resource capability;
+ protected Priority priority;
+ protected Priority priority2;
+ protected String node;
+ protected String rack;
+ protected String[] nodes;
+ protected String[] racks;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new YarnConfiguration();
+ createClusterAndStartApplication(conf);
+ }
+
+ protected void createClusterAndStartApplication(Configuration conf)
+ throws Exception {
+ // start minicluster
+ this.conf = conf;
+ if (autoUpdate) {
+ conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
+ }
+ conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
+ conf.setLong(
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+ rollingIntervalSec);
+ conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, amExpireMs);
+ conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
+ // set the minimum allocation so that resource decrease can go under 1024
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+ conf.setBoolean(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+ conf.setInt(
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
+ yarnCluster = new MiniYARNCluster(
+ TestAMRMClient.class.getName(), nodeCount, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+
+ // start rm client
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+
+ // get node info
+ assertTrue("All node managers did not connect to the RM within the "
+ + "allotted 5-second timeout",
+ yarnCluster.waitForNodeManagersToConnect(5000L));
+ nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
+ assertEquals("Not all node managers were reported running",
+ nodeCount, nodeReports.size());
+
+ priority = Priority.newInstance(1);
+ priority2 = Priority.newInstance(2);
+ capability = Resource.newInstance(1024, 1);
+
+ node = nodeReports.get(0).getNodeId().getHost();
+ rack = nodeReports.get(0).getRackName();
+ nodes = new String[]{ node };
+ racks = new String[]{ rack };
+
+ // submit new app
+ ApplicationSubmissionContext appContext =
+ yarnClient.createApplication().getApplicationSubmissionContext();
+ ApplicationId appId = appContext.getApplicationId();
+ // set the application name
+ appContext.setApplicationName("Test");
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ appContext.setPriority(pri);
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer =
+ BuilderUtils.newContainerLaunchContext(
+ Collections.<String, LocalResource> emptyMap(),
+ new HashMap<String, String>(), Arrays.asList("sleep", "100"),
+ new HashMap<String, ByteBuffer>(), null,
+ new HashMap<ApplicationAccessType, String>());
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(Resource.newInstance(1024, 1));
+ // Create the request to send to the applications manager
+ SubmitApplicationRequest appRequest = Records
+ .newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+ // Submit the application to the applications manager
+ yarnClient.submitApplication(appContext);
+
+ // wait for app to start
+ RMAppAttempt appAttempt = null;
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState() ==
+ YarnApplicationState.ACCEPTED) {
+ attemptId = appReport.getCurrentApplicationAttemptId();
+ appAttempt =
+ yarnCluster.getResourceManager().getRMContext().getRMApps()
+ .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+ while (true) {
+ if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+ break;
+ }
+ }
+ break;
+ }
+ }
+ // Just dig into the ResourceManager and get the AMRMToken just for the sake
+ // of testing.
+ UserGroupInformation.setLoginUser(UserGroupInformation
+ .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+
+ // emulate RM setup of AMRM token in credentials by adding the token
+ // *before* setting the token service
+ UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+ appAttempt.getAMRMToken().setService(
+ ClientRMProxy.getAMRMTokenService(conf));
+ }
+
+ @After
+ public void teardown() throws YarnException, IOException {
+ yarnClient.killApplication(attemptId.getApplicationId());
+ attemptId = null;
+
+ if (yarnClient != null &&
+ yarnClient.getServiceState() == Service.STATE.STARTED) {
+ yarnClient.stop();
+ }
+ if (yarnCluster != null &&
+ yarnCluster.getServiceState() == Service.STATE.STARTED) {
+ yarnCluster.stop();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fef39e7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 3ecc5cd..b059118 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -43,7 +43,6 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -56,24 +55,18 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
-import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -81,10 +74,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedule
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -97,26 +88,8 @@ import org.eclipse.jetty.util.log.Log;
* Test application master client class to resource manager.
*/
@RunWith(value = Parameterized.class)
-public class TestAMRMClient {
- private String schedulerName = null;
- private boolean autoUpdate = false;
- private Configuration conf = null;
- private MiniYARNCluster yarnCluster = null;
- private YarnClient yarnClient = null;
- private List<NodeReport> nodeReports = null;
- private ApplicationAttemptId attemptId = null;
- private int nodeCount = 3;
-
- static final int rolling_interval_sec = 13;
- static final long am_expire_ms = 4000;
-
- private Resource capability;
- private Priority priority;
- private Priority priority2;
- private String node;
- private String rack;
- private String[] nodes;
- private String[] racks;
+public class TestAMRMClient extends BaseAMRMClientTest{
+
private final static int DEFAULT_ITERATION = 3;
public TestAMRMClient(String schedulerName, boolean autoUpdate) {
@@ -134,127 +107,6 @@ public class TestAMRMClient {
});
}
- @Before
- public void setup() throws Exception {
- conf = new YarnConfiguration();
- createClusterAndStartApplication(conf);
- }
-
- private void createClusterAndStartApplication(Configuration conf)
- throws Exception {
- // start minicluster
- this.conf = conf;
- if (autoUpdate) {
- conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
- }
- conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
- conf.setLong(
- YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
- rolling_interval_sec);
- conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
- conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
- // set the minimum allocation so that resource decrease can go under 1024
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
- conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
- conf.setBoolean(
- YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
- conf.setInt(
- YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
- yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
- yarnCluster.init(conf);
- yarnCluster.start();
-
- // start rm client
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(conf);
- yarnClient.start();
-
- // get node info
- assertTrue("All node managers did not connect to the RM within the "
- + "allotted 5-second timeout",
- yarnCluster.waitForNodeManagersToConnect(5000L));
- nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
- assertEquals("Not all node managers were reported running",
- nodeCount, nodeReports.size());
-
- priority = Priority.newInstance(1);
- priority2 = Priority.newInstance(2);
- capability = Resource.newInstance(1024, 1);
-
- node = nodeReports.get(0).getNodeId().getHost();
- rack = nodeReports.get(0).getRackName();
- nodes = new String[]{ node };
- racks = new String[]{ rack };
-
- // submit new app
- ApplicationSubmissionContext appContext =
- yarnClient.createApplication().getApplicationSubmissionContext();
- ApplicationId appId = appContext.getApplicationId();
- // set the application name
- appContext.setApplicationName("Test");
- // Set the priority for the application master
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(0);
- appContext.setPriority(pri);
- // Set the queue to which this application is to be submitted in the RM
- appContext.setQueue("default");
- // Set up the container launch context for the application master
- ContainerLaunchContext amContainer =
- BuilderUtils.newContainerLaunchContext(
- Collections.<String, LocalResource> emptyMap(),
- new HashMap<String, String>(), Arrays.asList("sleep", "100"),
- new HashMap<String, ByteBuffer>(), null,
- new HashMap<ApplicationAccessType, String>());
- appContext.setAMContainerSpec(amContainer);
- appContext.setResource(Resource.newInstance(1024, 1));
- // Create the request to send to the applications manager
- SubmitApplicationRequest appRequest = Records
- .newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
- // Submit the application to the applications manager
- yarnClient.submitApplication(appContext);
-
- // wait for app to start
- RMAppAttempt appAttempt = null;
- while (true) {
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
- attemptId = appReport.getCurrentApplicationAttemptId();
- appAttempt =
- yarnCluster.getResourceManager().getRMContext().getRMApps()
- .get(attemptId.getApplicationId()).getCurrentAppAttempt();
- while (true) {
- if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
- break;
- }
- }
- break;
- }
- }
- // Just dig into the ResourceManager and get the AMRMToken just for the sake
- // of testing.
- UserGroupInformation.setLoginUser(UserGroupInformation
- .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-
- // emulate RM setup of AMRM token in credentials by adding the token
- // *before* setting the token service
- UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
- appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf));
- }
-
- @After
- public void teardown() throws YarnException, IOException {
- yarnClient.killApplication(attemptId.getApplicationId());
- attemptId = null;
-
- if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
- yarnClient.stop();
- }
- if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) {
- yarnCluster.stop();
- }
- }
-
@Test (timeout = 60000)
public void testAMRMClientNoMatchingRequests()
throws IOException, YarnException {
@@ -905,7 +757,7 @@ public class TestAMRMClient {
initAMRMClientAndTest(false);
}
- private void initAMRMClientAndTest(boolean useAllocReqId)
+ protected void initAMRMClientAndTest(boolean useAllocReqId)
throws YarnException, IOException {
AMRMClient<ContainerRequest> amClient = null;
try {
@@ -1946,7 +1798,7 @@ public class TestAMRMClient {
// Wait for enough time and make sure the roll_over happens
// At mean time, the old AMRMToken should continue to work
while (System.currentTimeMillis() - startTime <
- rolling_interval_sec * 1000) {
+ rollingIntervalSec * 1000) {
amClient.allocate(0.1f);
sleep(1000);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fef39e7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
new file mode 100644
index 0000000..fdc8d58
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+
+/**
+ * Test Placement Constraints and Scheduling Requests.
+ */
+public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest {
+
+ @Test(timeout=60000)
+ public void testAMRMClientWithPlacementConstraints()
+ throws Exception {
+ // we have to create a new instance of MiniYARNCluster to avoid SASL qop
+ // mismatches between client and server
+ teardown();
+ conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+ createClusterAndStartApplication(conf);
+
+ AMRMClient<AMRMClient.ContainerRequest> amClient =
+ AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
+ amClient.setNMTokenCache(new NMTokenCache());
+ //asserting we are not using the singleton instance cache
+ Assert.assertNotSame(NMTokenCache.getSingleton(),
+ amClient.getNMTokenCache());
+
+ final List<Container> allocatedContainers = new ArrayList<>();
+ final List<RejectedSchedulingRequest> rejectedSchedulingRequests =
+ new ArrayList<>();
+ AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient, 1000,
+ new AMRMClientAsync.AbstractCallbackHandler() {
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+ allocatedContainers.addAll(containers);
+ }
+
+ @Override
+ public void onRequestsRejected(
+ List<RejectedSchedulingRequest> rejReqs) {
+ rejectedSchedulingRequests.addAll(rejReqs);
+ }
+
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> statuses) {}
+ @Override
+ public void onContainersUpdated(List<UpdatedContainer> containers) {}
+ @Override
+ public void onShutdownRequest() {}
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+ @Override
+ public void onError(Throwable e) {}
+
+ @Override
+ public float getProgress() {
+ return 0.1f;
+ }
+ });
+
+ asyncClient.init(conf);
+ asyncClient.start();
+ Map<Set<String>, PlacementConstraint> pcMapping = new HashMap<>();
+ pcMapping.put(Collections.singleton("foo"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
+ pcMapping.put(Collections.singleton("bar"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetNotIn(NODE, allocationTag("bar"))));
+ asyncClient.registerApplicationMaster("Host", 10000, "", pcMapping);
+
+ // Send two types of requests - 4 with source tag "foo" have numAlloc = 1
+ // and 1 with source tag "bar" and has numAlloc = 4. Both should be
+ // handled similarly. i.e: Since there are only 3 nodes,
+ // 2 schedulingRequests - 1 with source tag "foo" on one with source
+ // tag "bar" should get rejected.
+ asyncClient.addSchedulingRequests(
+ Arrays.asList(
+ // 4 reqs with numAlloc = 1
+ schedulingRequest(1, 1, 1, 1, 512, "foo"),
+ schedulingRequest(1, 1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 1, 3, 1, 512, "foo"),
+ schedulingRequest(1, 1, 4, 1, 512, "foo"),
+ // 1 req with numAlloc = 4
+ schedulingRequest(4, 1, 5, 1, 512, "bar")));
+
+ // kick the scheduler
+ waitForContainerAllocation(allocatedContainers,
+ rejectedSchedulingRequests, 6, 2);
+
+ Assert.assertEquals(6, allocatedContainers.size());
+ Map<NodeId, List<Container>> containersPerNode =
+ allocatedContainers.stream().collect(
+ Collectors.groupingBy(Container::getNodeId));
+
+ // Ensure 2 containers allocated per node.
+ // Each node should have a "foo" and a "bar" container.
+ Assert.assertEquals(3, containersPerNode.entrySet().size());
+ HashSet<String> srcTags = new HashSet<>(Arrays.asList("foo", "bar"));
+ containersPerNode.entrySet().forEach(
+ x ->
+ Assert.assertEquals(
+ srcTags,
+ x.getValue()
+ .stream()
+ .map(y -> y.getAllocationTags().iterator().next())
+ .collect(Collectors.toSet()))
+ );
+
+ // Ensure 2 rejected requests - 1 of "foo" and 1 of "bar"
+ Assert.assertEquals(2, rejectedSchedulingRequests.size());
+ Assert.assertEquals(srcTags,
+ rejectedSchedulingRequests
+ .stream()
+ .map(x -> x.getRequest().getAllocationTags().iterator().next())
+ .collect(Collectors.toSet()));
+
+ asyncClient.stop();
+ }
+
+ private static void waitForContainerAllocation(
+ List<Container> allocatedContainers,
+ List<RejectedSchedulingRequest> rejectedRequests,
+ int containerNum, int rejNum) throws Exception {
+
+ int maxCount = 10;
+ while (maxCount >= 0 &&
+ (allocatedContainers.size() < containerNum ||
+ rejectedRequests.size() < rejNum)) {
+ maxCount--;
+ sleep(1000);
+ }
+ }
+
+ private static SchedulingRequest schedulingRequest(int numAllocations,
+ int priority, long allocReqId, int cores, int mem, String... tags) {
+ return schedulingRequest(numAllocations, priority, allocReqId, cores, mem,
+ ExecutionType.GUARANTEED, tags);
+ }
+
+ private static SchedulingRequest schedulingRequest(int numAllocations,
+ int priority, long allocReqId, int cores, int mem,
+ ExecutionType execType, String... tags) {
+ return SchedulingRequest.newBuilder()
+ .priority(Priority.newInstance(priority))
+ .allocationRequestId(allocReqId)
+ .allocationTags(new HashSet<>(Arrays.asList(tags)))
+ .executionType(ExecutionTypeRequest.newInstance(execType, true))
+ .resourceSizing(
+ ResourceSizing.newInstance(numAllocations,
+ Resource.newInstance(mem, cores)))
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fef39e7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 563df0d..a504221 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -262,6 +262,9 @@ public class RMContainerImpl implements RMContainer {
rmContext.getSystemMetricsPublisher().containerCreated(
this, this.creationTime);
}
+ if (this.container != null) {
+ this.allocationTags = this.container.getAllocationTags();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fef39e7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index d4dcc77..b2ea54a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -589,6 +589,7 @@ public abstract class AbstractYarnScheduler
container.setVersion(status.getVersion());
container.setExecutionType(status.getExecutionType());
container.setAllocationRequestId(status.getAllocationRequestId());
+ container.setAllocationTags(status.getAllocationTags());
ApplicationAttemptId attemptId =
container.getId().getApplicationAttemptId();
RMContainer rmContainer = new RMContainerImpl(container,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fef39e7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 88a9049..3930a35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -672,6 +672,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
containerType, container.getExecutionType(),
container.getAllocationRequestId(),
rmContainer.getAllocationTags()));
+ container.setAllocationTags(rmContainer.getAllocationTags());
updateNMToken(container);
} catch (IllegalArgumentException e) {
// DNS might be down, skip returning this container.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fef39e7f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index 956a3c9..c4b82e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -64,12 +64,12 @@ public final class PlacementConstraintsUtil {
throws InvalidAllocationTagsQueryException {
long minScopeCardinality = 0;
long maxScopeCardinality = 0;
- if (sc.getScope() == PlacementConstraints.NODE) {
+ if (sc.getScope().equals(PlacementConstraints.NODE)) {
minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
te.getTargetValues(), Long::max);
maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
te.getTargetValues(), Long::min);
- } else if (sc.getScope() == PlacementConstraints.RACK) {
+ } else if (sc.getScope().equals(PlacementConstraints.RACK)) {
minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
te.getTargetValues(), Long::max);
maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org