You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/12/27 20:46:32 UTC
[2/2] hadoop git commit: YARN-5938. Refactoring
OpportunisticContainerAllocator to use SchedulerRequestKey instead of
Priority and other misc fixes (asuresh)
YARN-5938. Refactoring OpportunisticContainerAllocator to use SchedulerRequestKey instead of Priority and other misc fixes (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ac1e5d4f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ac1e5d4f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ac1e5d4f
Branch: refs/heads/trunk
Commit: ac1e5d4f77e3b9df8dcacb0b1f72eecc27931eb8
Parents: c3973e7
Author: Arun Suresh <as...@apache.org>
Authored: Tue Dec 27 11:54:57 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Tue Dec 27 12:40:44 2016 -0800
----------------------------------------------------------------------
.../api/records/UpdateContainerRequest.java | 11 +
.../impl/pb/AllocateResponsePBImpl.java | 4 +-
.../server/api/protocolrecords/RemoteNode.java | 7 +
.../OpportunisticContainerAllocator.java | 61 +--
.../OpportunisticContainerContext.java | 38 +-
.../server/scheduler/SchedulerRequestKey.java | 130 ++++++
.../scheduler/DistributedScheduler.java | 11 +-
.../ApplicationMasterService.java | 423 ++++++++++---------
...pportunisticContainerAllocatorAMService.java | 45 +-
.../rmcontainer/RMContainer.java | 3 +-
.../rmcontainer/RMContainerImpl.java | 2 +-
.../rmcontainer/RMContainerReservedEvent.java | 2 +-
.../scheduler/AppSchedulingInfo.java | 3 +
.../scheduler/SchedulerApplicationAttempt.java | 4 +-
.../scheduler/SchedulerNode.java | 1 +
.../scheduler/SchedulerRequestKey.java | 122 ------
.../scheduler/capacity/LeafQueue.java | 7 +-
.../allocator/IncreaseContainerAllocator.java | 2 +-
.../allocator/RegularContainerAllocator.java | 2 +-
.../scheduler/common/SchedulerContainer.java | 2 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 3 +-
.../common/fica/FiCaSchedulerNode.java | 2 +-
.../distributed/NodeQueueLoadMonitor.java | 16 +-
.../scheduler/fair/FSAppAttempt.java | 2 +-
.../scheduler/fair/FSSchedulerNode.java | 2 +-
.../scheduler/fifo/FifoAppAttempt.java | 4 +-
.../scheduler/fifo/FifoScheduler.java | 4 +-
.../placement/SchedulingPlacementSet.java | 2 +-
.../server/resourcemanager/Application.java | 2 +-
.../yarn/server/resourcemanager/Task.java | 2 +-
...alCapacityPreemptionPolicyMockFramework.java | 3 +-
...estProportionalCapacityPreemptionPolicy.java | 3 +-
.../scheduler/TestAppSchedulingInfo.java | 1 +
.../TestSchedulerApplicationAttempt.java | 2 +
.../capacity/TestCapacityScheduler.java | 2 +-
.../scheduler/capacity/TestLeafQueue.java | 2 +-
.../scheduler/capacity/TestReservations.java | 2 +-
.../scheduler/capacity/TestUtils.java | 2 +-
.../fair/TestContinuousScheduling.java | 2 +-
.../scheduler/fair/TestFSAppAttempt.java | 2 +-
40 files changed, 498 insertions(+), 442 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
index 200dea3..e4f7a82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
@@ -159,6 +159,17 @@ public abstract class UpdateContainerRequest extends AbstractResourceRequest {
}
@Override
+ public String toString() {
+ return "UpdateReq{" +
+ "containerId=" + getContainerId() + ", " +
+ "containerVersion=" + getContainerVersion() + ", " +
+ "targetExecType=" + getExecutionType() + ", " +
+ "targetCapability=" + getCapability() + ", " +
+ "updateType=" + getContainerUpdateType() + ", " +
+ "}";
+ }
+
+ @Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index c0d52a6..de3fc47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -282,8 +282,8 @@ public class AllocateResponsePBImpl extends AllocateResponse {
final List<Container> containers) {
if (containers == null)
return;
- // this looks like a bug because it results in append and not set
initLocalNewContainerList();
+ allocatedContainers.clear();
allocatedContainers.addAll(containers);
}
@@ -299,6 +299,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
if (containers == null)
return;
initLocalUpdatedContainerList();
+ updatedContainers.clear();
updatedContainers.addAll(containers);
}
@@ -315,6 +316,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
if (containers == null)
return;
initLocalFinishedContainerList();
+ completedContainersStatuses.clear();
completedContainersStatuses.addAll(containers);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
index 2b76257..e403a12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
@@ -87,4 +87,11 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
public int compareTo(RemoteNode other) {
return this.getNodeId().compareTo(other.getNodeId());
}
+
+ @Override
+ public String toString() {
+ return "RemoteNode{" +
+ "nodeId=" + getNodeId() + ", " +
+ "httpAddress=" + getHttpAddress() + "}";
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 16436bd..c1300b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -22,8 +22,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -192,7 +199,8 @@ public class OpportunisticContainerAllocator {
/**
* Allocate OPPORTUNISTIC containers.
- * @param request AllocateRequest
+ * @param blackList Resource BlackList Request
+ * @param oppResourceReqs Opportunistic Resource Requests
* @param applicationAttemptId ApplicationAttemptId
* @param opportContext App specific OpportunisticContainerContext
* @param rmIdentifier RM Identifier
@@ -200,32 +208,24 @@ public class OpportunisticContainerAllocator {
* @return List of Containers.
* @throws YarnException YarnException
*/
- public List<Container> allocateContainers(
- AllocateRequest request, ApplicationAttemptId applicationAttemptId,
+ public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
+ List<ResourceRequest> oppResourceReqs,
+ ApplicationAttemptId applicationAttemptId,
OpportunisticContainerContext opportContext, long rmIdentifier,
String appSubmitter) throws YarnException {
- // Update released containers.
- List<ContainerId> releasedContainers = request.getReleaseList();
- int numReleasedContainers = releasedContainers.size();
- if (numReleasedContainers > 0) {
- LOG.info("AttemptID: " + applicationAttemptId + " released: "
- + numReleasedContainers);
- opportContext.getContainersAllocated().removeAll(releasedContainers);
- }
// Update black list.
- ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
- if (rbr != null) {
- opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
- opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
+ if (blackList != null) {
+ opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals());
+ opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions());
}
// Add OPPORTUNISTIC requests to the outstanding ones.
- opportContext.addToOutstandingReqs(request.getAskList());
+ opportContext.addToOutstandingReqs(oppResourceReqs);
// Satisfy the outstanding OPPORTUNISTIC requests.
List<Container> allocatedContainers = new ArrayList<>();
- for (Priority priority :
+ for (SchedulerRequestKey schedulerKey :
opportContext.getOutstandingOpReqs().descendingKeySet()) {
// Allocated containers :
// Key = Requested Capability,
@@ -234,7 +234,7 @@ public class OpportunisticContainerAllocator {
// we need the requested capability (key) to match against
// the outstanding reqs)
Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
- opportContext, priority, applicationAttemptId, appSubmitter);
+ opportContext, schedulerKey, applicationAttemptId, appSubmitter);
for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
opportContext.matchAllocationToOutstandingRequest(
e.getKey(), e.getValue());
@@ -246,19 +246,22 @@ public class OpportunisticContainerAllocator {
}
private Map<Resource, List<Container>> allocate(long rmIdentifier,
- OpportunisticContainerContext appContext, Priority priority,
+ OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
ApplicationAttemptId appAttId, String userName) throws YarnException {
Map<Resource, List<Container>> containers = new HashMap<>();
for (ResourceRequest anyAsk :
- appContext.getOutstandingOpReqs().get(priority).values()) {
+ appContext.getOutstandingOpReqs().get(schedKey).values()) {
allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
appContext.getContainerIdGenerator(), appContext.getBlacklist(),
appAttId, appContext.getNodeMap(), userName, containers, anyAsk);
- LOG.info("Opportunistic allocation requested for ["
- + "priority=" + anyAsk.getPriority()
- + ", num_containers=" + anyAsk.getNumContainers()
- + ", capability=" + anyAsk.getCapability() + "]"
- + " allocated = " + containers.get(anyAsk.getCapability()).size());
+ if (!containers.isEmpty()) {
+ LOG.info("Opportunistic allocation requested for ["
+ + "priority=" + anyAsk.getPriority()
+ + ", allocationRequestId=" + anyAsk.getAllocationRequestId()
+ + ", num_containers=" + anyAsk.getNumContainers()
+ + ", capability=" + anyAsk.getCapability() + "]"
+ + " allocated = " + containers.keySet());
+ }
}
return containers;
}
@@ -282,7 +285,9 @@ public class OpportunisticContainerAllocator {
nodesForScheduling.add(nodeEntry.getValue());
}
if (nodesForScheduling.isEmpty()) {
- LOG.warn("No nodes available for allocating opportunistic containers.");
+ LOG.warn("No nodes available for allocating opportunistic containers. [" +
+ "allNodes=" + allNodes + ", " +
+ "blacklist=" + blacklist + "]");
return;
}
int numAllocated = 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
index 725e2d9..a2f9f4d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
@@ -18,12 +18,7 @@
package org.apache.hadoop.yarn.server.scheduler;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
@@ -52,9 +47,6 @@ public class OpportunisticContainerContext {
private static final Logger LOG = LoggerFactory
.getLogger(OpportunisticContainerContext.class);
- // Currently just used to keep track of allocated containers.
- // Can be used for reporting stats later.
- private Set<ContainerId> containersAllocated = new HashSet<>();
private AllocationParams appParams =
new AllocationParams();
private ContainerIdGenerator containerIdGenerator =
@@ -69,13 +61,9 @@ public class OpportunisticContainerContext {
// Resource Name (host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequest (ask).
- private final TreeMap<Priority, Map<Resource, ResourceRequest>>
+ private final TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
outstandingOpReqs = new TreeMap<>();
- public Set<ContainerId> getContainersAllocated() {
- return containersAllocated;
- }
-
public AllocationParams getAppParams() {
return appParams;
}
@@ -119,20 +107,11 @@ public class OpportunisticContainerContext {
return blacklist;
}
- public TreeMap<Priority, Map<Resource, ResourceRequest>>
+ public TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
getOutstandingOpReqs() {
return outstandingOpReqs;
}
- public void updateCompletedContainers(AllocateResponse allocateResponse) {
- for (ContainerStatus cs :
- allocateResponse.getCompletedContainersStatuses()) {
- if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
- containersAllocated.remove(cs.getContainerId());
- }
- }
- }
-
/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
* (Priority, ResourceName, Capability) and adds to the outstanding
@@ -144,7 +123,7 @@ public class OpportunisticContainerContext {
*/
public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
for (ResourceRequest request : resourceAsks) {
- Priority priority = request.getPriority();
+ SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
// TODO: Extend for Node/Rack locality. We only handle ANY requests now
if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
@@ -156,10 +135,10 @@ public class OpportunisticContainerContext {
}
Map<Resource, ResourceRequest> reqMap =
- outstandingOpReqs.get(priority);
+ outstandingOpReqs.get(schedulerKey);
if (reqMap == null) {
reqMap = new HashMap<>();
- outstandingOpReqs.put(priority, reqMap);
+ outstandingOpReqs.put(schedulerKey, reqMap);
}
ResourceRequest resourceRequest = reqMap.get(request.getCapability());
@@ -171,7 +150,8 @@ public class OpportunisticContainerContext {
resourceRequest.getNumContainers() + request.getNumContainers());
}
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
- LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
+ LOG.info("# of outstandingOpReqs in ANY (at" +
+ "priority = "+ schedulerKey.getPriority()
+ ", with capability = " + request.getCapability() + " ) : "
+ resourceRequest.getNumContainers());
}
@@ -187,9 +167,9 @@ public class OpportunisticContainerContext {
public void matchAllocationToOutstandingRequest(Resource capability,
List<Container> allocatedContainers) {
for (Container c : allocatedContainers) {
- containersAllocated.add(c.getId());
+ SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c);
Map<Resource, ResourceRequest> asks =
- outstandingOpReqs.get(c.getPriority());
+ outstandingOpReqs.get(schedulerKey);
if (asks == null) {
continue;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
new file mode 100644
index 0000000..9b7edbe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+
+/**
+ * Composite key for outstanding scheduler requests for any schedulable entity.
+ * Currently it includes {@link Priority}.
+ */
+public final class SchedulerRequestKey implements
+ Comparable<SchedulerRequestKey> {
+
+ private final Priority priority;
+ private final long allocationRequestId;
+
+ /**
+ * Factory method to generate a SchedulerRequestKey from a ResourceRequest.
+ * @param req ResourceRequest
+ * @return SchedulerRequestKey
+ */
+ public static SchedulerRequestKey create(ResourceRequest req) {
+ return new SchedulerRequestKey(req.getPriority(),
+ req.getAllocationRequestId());
+ }
+
+ /**
+ * Convenience method to extract the SchedulerRequestKey used to schedule the
+ * Container.
+ * @param container Container
+ * @return SchedulerRequestKey
+ */
+ public static SchedulerRequestKey extractFrom(Container container) {
+ return new SchedulerRequestKey(container.getPriority(),
+ container.getAllocationRequestId());
+ }
+
+ SchedulerRequestKey(Priority priority, long allocationRequestId) {
+ this.priority = priority;
+ this.allocationRequestId = allocationRequestId;
+ }
+
+ /**
+ * Get the {@link Priority} of the request.
+ *
+ * @return the {@link Priority} of the request
+ */
+ public Priority getPriority() {
+ return priority;
+ }
+
+ /**
+ * Get the Id of the associated {@link ResourceRequest}.
+ *
+ * @return the Id of the associated {@link ResourceRequest}
+ */
+ public long getAllocationRequestId() {
+ return allocationRequestId;
+ }
+
+ @Override
+ public int compareTo(SchedulerRequestKey o) {
+ if (o == null) {
+ return (priority != null) ? -1 : 0;
+ } else {
+ if (priority == null) {
+ return 1;
+ }
+ }
+ int priorityCompare = o.getPriority().compareTo(priority);
+ // we first sort by priority and then by allocationRequestId
+ if (priorityCompare != 0) {
+ return priorityCompare;
+ }
+ return Long.compare(allocationRequestId, o.getAllocationRequestId());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SchedulerRequestKey)) {
+ return false;
+ }
+
+ SchedulerRequestKey that = (SchedulerRequestKey) o;
+
+ if (getAllocationRequestId() != that.getAllocationRequestId()) {
+ return false;
+ }
+ return getPriority() != null ?
+ getPriority().equals(that.getPriority()) :
+ that.getPriority() == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getPriority() != null ? getPriority().hashCode() : 0;
+ result = 31 * result + (int) (getAllocationRequestId() ^ (
+ getAllocationRequestId() >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "SchedulerRequestKey{" +
+ "priority=" + priority +
+ ", allocationRequestId=" + allocationRequestId +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
index 0f47c93..a9b5ed4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
@@ -227,10 +227,10 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
.partitionAskList(request.getAllocateRequest().getAskList());
// Allocate OPPORTUNISTIC containers.
- request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic());
List<Container> allocatedContainers =
containerAllocator.allocateContainers(
- request.getAllocateRequest(), applicationAttemptId,
+ request.getAllocateRequest().getResourceBlacklistRequest(),
+ partitionedAsks.getOpportunistic(), applicationAttemptId,
oppContainerContext, rmIdentifier, appSubmitter);
// Prepare request for sending to RM for scheduling GUARANTEED containers.
@@ -252,18 +252,11 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
nodeTokens.put(nmToken.getNodeId(), nmToken);
}
- oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse());
-
// Check if we have NM tokens for all the allocated containers. If not
// generate one and update the response.
updateAllocateResponse(
dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Number of opportunistic containers currently" +
- "allocated by application: " + oppContainerContext
- .getContainersAllocated().size());
- }
return dsResp;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 3d7b2b1..9fd1845 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -405,7 +405,6 @@ public class ApplicationMasterService extends AbstractService implements
ApplicationAttemptId appAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
- ApplicationId applicationId = appAttemptId.getApplicationId();
this.amLivelinessMonitor.receivedPing(appAttemptId);
@@ -422,8 +421,10 @@ public class ApplicationMasterService extends AbstractService implements
AllocateResponse lastResponse = lock.getAllocateResponse();
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
- "AM is not registered for known application attempt: " + appAttemptId
- + " or RM had restarted after AM registered . AM should re-register.";
+ "AM is not registered for known application attempt: "
+ + appAttemptId
+ + " or RM had restarted after AM registered. "
+ + " AM should re-register.";
throw new ApplicationMasterNotRegisteredException(message);
}
@@ -438,185 +439,10 @@ public class ApplicationMasterService extends AbstractService implements
throw new InvalidApplicationMasterRequestException(message);
}
- //filter illegal progress values
- float filteredProgress = request.getProgress();
- if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
- || filteredProgress < 0) {
- request.setProgress(0);
- } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) {
- request.setProgress(1);
- }
-
- // Send the status update to the appAttempt.
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptStatusupdateEvent(appAttemptId, request
- .getProgress()));
-
- List<ResourceRequest> ask = request.getAskList();
- List<ContainerId> release = request.getReleaseList();
-
- ResourceBlacklistRequest blacklistRequest =
- request.getResourceBlacklistRequest();
- List<String> blacklistAdditions =
- (blacklistRequest != null) ?
- blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
- List<String> blacklistRemovals =
- (blacklistRequest != null) ?
- blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
- RMApp app =
- this.rmContext.getRMApps().get(applicationId);
-
- // set label expression for Resource Requests if resourceName=ANY
- ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
- for (ResourceRequest req : ask) {
- if (null == req.getNodeLabelExpression()
- && ResourceRequest.ANY.equals(req.getResourceName())) {
- req.setNodeLabelExpression(asc.getNodeLabelExpression());
- }
- }
-
- Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
-
- // sanity check
- try {
- RMServerUtils.normalizeAndValidateRequests(ask,
- maximumCapacity, app.getQueue(),
- rScheduler, rmContext);
- } catch (InvalidResourceRequestException e) {
- LOG.warn("Invalid resource ask by application " + appAttemptId, e);
- throw e;
- }
-
- try {
- RMServerUtils.validateBlacklistRequest(blacklistRequest);
- } catch (InvalidResourceBlacklistRequestException e) {
- LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
- throw e;
- }
-
- // In the case of work-preserving AM restart, it's possible for the
- // AM to release containers from the earlier attempt.
- if (!app.getApplicationSubmissionContext()
- .getKeepContainersAcrossApplicationAttempts()) {
- try {
- RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
- } catch (InvalidContainerReleaseException e) {
- LOG.warn("Invalid container release by application " + appAttemptId,
- e);
- throw e;
- }
- }
-
- // Split Update Resource Requests into increase and decrease.
- // No Exceptions are thrown here. All update errors are aggregated
- // and returned to the AM.
- List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
- List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
- List<UpdateContainerError> updateContainerErrors =
- RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext,
- request, maximumCapacity, increaseResourceReqs,
- decreaseResourceReqs);
-
- // Send new requests to appAttempt.
- Allocation allocation;
- RMAppAttemptState state =
- app.getRMAppAttempt(appAttemptId).getAppAttemptState();
- if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
- state.equals(RMAppAttemptState.FINISHING) ||
- app.isAppFinalStateStored()) {
- LOG.warn(appAttemptId + " is in " + state +
- " state, ignore container allocate request.");
- allocation = EMPTY_ALLOCATION;
- } else {
- allocation =
- this.rScheduler.allocate(appAttemptId, ask, release,
- blacklistAdditions, blacklistRemovals,
- increaseResourceReqs, decreaseResourceReqs);
- }
-
- if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
- LOG.info("blacklist are updated in Scheduler." +
- "blacklistAdditions: " + blacklistAdditions + ", " +
- "blacklistRemovals: " + blacklistRemovals);
- }
- RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
- AllocateResponse allocateResponse =
+ AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);
- if (allocation.getNMTokens() != null &&
- !allocation.getNMTokens().isEmpty()) {
- allocateResponse.setNMTokens(allocation.getNMTokens());
- }
-
- // Notify the AM of container update errors
- if (!updateContainerErrors.isEmpty()) {
- allocateResponse.setUpdateErrors(updateContainerErrors);
- }
- // update the response with the deltas of node status changes
- List<RMNode> updatedNodes = new ArrayList<RMNode>();
- if(app.pullRMNodeUpdates(updatedNodes) > 0) {
- List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
- for(RMNode rmNode: updatedNodes) {
- SchedulerNodeReport schedulerNodeReport =
- rScheduler.getNodeReport(rmNode.getNodeID());
- Resource used = BuilderUtils.newResource(0, 0);
- int numContainers = 0;
- if (schedulerNodeReport != null) {
- used = schedulerNodeReport.getUsedResource();
- numContainers = schedulerNodeReport.getNumContainers();
- }
- NodeId nodeId = rmNode.getNodeID();
- NodeReport report =
- BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
- rmNode.getHttpAddress(), rmNode.getRackName(), used,
- rmNode.getTotalCapability(), numContainers,
- rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
- rmNode.getNodeLabels());
-
- updatedNodeReports.add(report);
- }
- allocateResponse.setUpdatedNodes(updatedNodeReports);
- }
-
- allocateResponse.setAllocatedContainers(allocation.getContainers());
- allocateResponse.setCompletedContainersStatuses(appAttempt
- .pullJustFinishedContainers());
- allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
- allocateResponse.setAvailableResources(allocation.getResourceLimit());
-
- // Handling increased/decreased containers
- List<UpdatedContainer> updatedContainers = new ArrayList<>();
- if (allocation.getIncreasedContainers() != null) {
- for (Container c : allocation.getIncreasedContainers()) {
- updatedContainers.add(
- UpdatedContainer.newInstance(
- ContainerUpdateType.INCREASE_RESOURCE, c));
- }
- }
- if (allocation.getDecreasedContainers() != null) {
- for (Container c : allocation.getDecreasedContainers()) {
- updatedContainers.add(
- UpdatedContainer.newInstance(
- ContainerUpdateType.DECREASE_RESOURCE, c));
- }
- }
-
- allocateResponse.setUpdatedContainers(updatedContainers);
-
- allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
-
- // add collector address for this application
- if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
- allocateResponse.setCollectorAddr(
- this.rmContext.getRMApps().get(applicationId).getCollectorAddr());
- }
-
- // add preemption to the allocateResponse message (if any)
- allocateResponse
- .setPreemptionMessage(generatePreemptionMessage(allocation));
-
- // Set application priority
- allocateResponse.setApplicationPriority(app
- .getApplicationPriority());
+ allocateInternal(amrmTokenIdentifier.getApplicationAttemptId(),
+ request, response);
// update AMRMToken if the token is rolled-up
MasterKeyData nextMasterKey =
@@ -624,21 +450,24 @@ public class ApplicationMasterService extends AbstractService implements
if (nextMasterKey != null
&& nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
- .getKeyId()) {
+ .getKeyId()) {
+ RMApp app =
+ this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
+ RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt;
Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken();
if (nextMasterKey.getMasterKey().getKeyId() !=
appAttemptImpl.getAMRMTokenKeyId()) {
LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
- + " to application: " + applicationId);
+ + " to application: " + appAttemptId.getApplicationId());
amrmToken = rmContext.getAMRMTokenSecretManager()
.createAndGetAMRMToken(appAttemptId);
appAttemptImpl.setAMRMToken(amrmToken);
}
- allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
- .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
- .toString(), amrmToken.getPassword(), amrmToken.getService()
- .toString()));
+ response.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+ .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
+ .toString(), amrmToken.getPassword(), amrmToken.getService()
+ .toString()));
}
/*
@@ -646,11 +475,227 @@ public class ApplicationMasterService extends AbstractService implements
* need to worry about unregister call occurring in between (which
* removes the lock object).
*/
- lock.setAllocateResponse(allocateResponse);
- return allocateResponse;
+ response.setResponseId(lastResponse.getResponseId() + 1);
+ lock.setAllocateResponse(response);
+ return response;
}
}
+ protected void allocateInternal(ApplicationAttemptId appAttemptId,
+ AllocateRequest request, AllocateResponse allocateResponse)
+ throws YarnException {
+
+ //filter illegal progress values
+ float filteredProgress = request.getProgress();
+ if (Float.isNaN(filteredProgress) ||
+ filteredProgress == Float.NEGATIVE_INFINITY ||
+ filteredProgress < 0) {
+ request.setProgress(0);
+ } else if (filteredProgress > 1 ||
+ filteredProgress == Float.POSITIVE_INFINITY) {
+ request.setProgress(1);
+ }
+
+ // Send the status update to the appAttempt.
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptStatusupdateEvent(appAttemptId, request
+ .getProgress()));
+
+ List<ResourceRequest> ask = request.getAskList();
+ List<ContainerId> release = request.getReleaseList();
+
+ ResourceBlacklistRequest blacklistRequest =
+ request.getResourceBlacklistRequest();
+ List<String> blacklistAdditions =
+ (blacklistRequest != null) ?
+ blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
+ List<String> blacklistRemovals =
+ (blacklistRequest != null) ?
+ blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
+ RMApp app =
+ this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
+
+ // set label expression for Resource Requests if resourceName=ANY
+ ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
+ for (ResourceRequest req : ask) {
+ if (null == req.getNodeLabelExpression()
+ && ResourceRequest.ANY.equals(req.getResourceName())) {
+ req.setNodeLabelExpression(asc.getNodeLabelExpression());
+ }
+ }
+
+ Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
+
+ // sanity check
+ try {
+ RMServerUtils.normalizeAndValidateRequests(ask,
+ maximumCapacity, app.getQueue(),
+ rScheduler, rmContext);
+ } catch (InvalidResourceRequestException e) {
+ LOG.warn("Invalid resource ask by application " + appAttemptId, e);
+ throw e;
+ }
+
+ try {
+ RMServerUtils.validateBlacklistRequest(blacklistRequest);
+ } catch (InvalidResourceBlacklistRequestException e) {
+ LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
+ throw e;
+ }
+
+ // In the case of work-preserving AM restart, it's possible for the
+ // AM to release containers from the earlier attempt.
+ if (!app.getApplicationSubmissionContext()
+ .getKeepContainersAcrossApplicationAttempts()) {
+ try {
+ RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
+ } catch (InvalidContainerReleaseException e) {
+ LOG.warn("Invalid container release by application " + appAttemptId,
+ e);
+ throw e;
+ }
+ }
+
+ // Split Update Resource Requests into increase and decrease.
+ // No Exceptions are thrown here. All update errors are aggregated
+ // and returned to the AM.
+ List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
+ List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
+ List<UpdateContainerError> updateContainerErrors =
+ RMServerUtils.validateAndSplitUpdateResourceRequests(
+ rmContext, request, maximumCapacity,
+ increaseResourceReqs, decreaseResourceReqs);
+
+ // Send new requests to appAttempt.
+ Allocation allocation;
+ RMAppAttemptState state =
+ app.getRMAppAttempt(appAttemptId).getAppAttemptState();
+ if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
+ state.equals(RMAppAttemptState.FINISHING) ||
+ app.isAppFinalStateStored()) {
+ LOG.warn(appAttemptId + " is in " + state +
+ " state, ignore container allocate request.");
+ allocation = EMPTY_ALLOCATION;
+ } else {
+ allocation =
+ this.rScheduler.allocate(appAttemptId, ask, release,
+ blacklistAdditions, blacklistRemovals,
+ increaseResourceReqs, decreaseResourceReqs);
+ }
+
+ if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
+ LOG.info("blacklist are updated in Scheduler." +
+ "blacklistAdditions: " + blacklistAdditions + ", " +
+ "blacklistRemovals: " + blacklistRemovals);
+ }
+ RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
+
+ if (allocation.getNMTokens() != null &&
+ !allocation.getNMTokens().isEmpty()) {
+ allocateResponse.setNMTokens(allocation.getNMTokens());
+ }
+
+ // Notify the AM of container update errors
+ addToUpdateContainerErrors(allocateResponse, updateContainerErrors);
+
+ // update the response with the deltas of node status changes
+ List<RMNode> updatedNodes = new ArrayList<RMNode>();
+ if(app.pullRMNodeUpdates(updatedNodes) > 0) {
+ List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
+ for(RMNode rmNode: updatedNodes) {
+ SchedulerNodeReport schedulerNodeReport =
+ rScheduler.getNodeReport(rmNode.getNodeID());
+ Resource used = BuilderUtils.newResource(0, 0);
+ int numContainers = 0;
+ if (schedulerNodeReport != null) {
+ used = schedulerNodeReport.getUsedResource();
+ numContainers = schedulerNodeReport.getNumContainers();
+ }
+ NodeId nodeId = rmNode.getNodeID();
+ NodeReport report =
+ BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
+ rmNode.getHttpAddress(), rmNode.getRackName(), used,
+ rmNode.getTotalCapability(), numContainers,
+ rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
+ rmNode.getNodeLabels());
+
+ updatedNodeReports.add(report);
+ }
+ allocateResponse.setUpdatedNodes(updatedNodeReports);
+ }
+
+ addToAllocatedContainers(allocateResponse, allocation.getContainers());
+
+ allocateResponse.setCompletedContainersStatuses(appAttempt
+ .pullJustFinishedContainers());
+ allocateResponse.setAvailableResources(allocation.getResourceLimit());
+
+ // Handling increased containers
+ addToUpdatedContainers(
+ allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
+ allocation.getIncreasedContainers());
+
+ // Handling decreased containers
+ addToUpdatedContainers(
+ allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
+ allocation.getDecreasedContainers());
+
+ allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
+
+ // add collector address for this application
+ if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+ allocateResponse.setCollectorAddr(
+ this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
+ .getCollectorAddr());
+ }
+
+ // add preemption to the allocateResponse message (if any)
+ allocateResponse
+ .setPreemptionMessage(generatePreemptionMessage(allocation));
+
+ // Set application priority
+ allocateResponse.setApplicationPriority(app
+ .getApplicationPriority());
+ }
+
+ protected void addToUpdateContainerErrors(AllocateResponse allocateResponse,
+ List<UpdateContainerError> updateContainerErrors) {
+ if (!updateContainerErrors.isEmpty()) {
+ if (allocateResponse.getUpdateErrors() != null
+ && !allocateResponse.getUpdateErrors().isEmpty()) {
+ updateContainerErrors = new ArrayList<>(updateContainerErrors);
+ updateContainerErrors.addAll(allocateResponse.getUpdateErrors());
+ }
+ allocateResponse.setUpdateErrors(updateContainerErrors);
+ }
+ }
+
+ protected void addToUpdatedContainers(AllocateResponse allocateResponse,
+ ContainerUpdateType updateType, List<Container> updatedContainers) {
+ if (updatedContainers != null && updatedContainers.size() > 0) {
+ ArrayList<UpdatedContainer> containersToSet = new ArrayList<>();
+ if (allocateResponse.getUpdatedContainers() != null &&
+ !allocateResponse.getUpdatedContainers().isEmpty()) {
+ containersToSet.addAll(allocateResponse.getUpdatedContainers());
+ }
+ for (Container updatedContainer : updatedContainers) {
+ containersToSet.add(
+ UpdatedContainer.newInstance(updateType, updatedContainer));
+ }
+ allocateResponse.setUpdatedContainers(containersToSet);
+ }
+ }
+
+ protected void addToAllocatedContainers(AllocateResponse allocateResponse,
+ List<Container> allocatedContainers) {
+ if (allocateResponse.getAllocatedContainers() != null
+ && !allocateResponse.getAllocatedContainers().isEmpty()) {
+ allocatedContainers = new ArrayList<>(allocatedContainers);
+ allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
+ }
+ allocateResponse.setAllocatedContainers(allocatedContainers);
+ }
+
private PreemptionMessage generatePreemptionMessage(Allocation allocation){
PreemptionMessage pMsg = null;
// assemble strict preemption request
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index a527d04..9d4c092 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -101,8 +101,8 @@ public class OpportunisticContainerAllocatorAMService
private final int k;
private final long cacheRefreshInterval;
- private List<RemoteNode> cachedNodes;
- private long lastCacheUpdateTime;
+ private volatile List<RemoteNode> cachedNodes;
+ private volatile long lastCacheUpdateTime;
public OpportunisticContainerAllocatorAMService(RMContext rmContext,
YarnScheduler scheduler) {
@@ -218,8 +218,9 @@ public class OpportunisticContainerAllocatorAMService
}
@Override
- public AllocateResponse allocate(AllocateRequest request) throws
- YarnException, IOException {
+ protected void allocateInternal(ApplicationAttemptId appAttemptId,
+ AllocateRequest request, AllocateResponse allocateResponse)
+ throws YarnException {
// Partition requests to GUARANTEED and OPPORTUNISTIC.
OpportunisticContainerAllocator.PartitionedResourceRequests
@@ -227,40 +228,30 @@ public class OpportunisticContainerAllocatorAMService
oppContainerAllocator.partitionAskList(request.getAskList());
// Allocate OPPORTUNISTIC containers.
- request.setAskList(partitionedAsks.getOpportunistic());
- final ApplicationAttemptId appAttemptId = getAppAttemptId();
- SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
- rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
+ SchedulerApplicationAttempt appAttempt =
+ ((AbstractYarnScheduler)rmContext.getScheduler())
+ .getApplicationAttempt(appAttemptId);
OpportunisticContainerContext oppCtx =
appAttempt.getOpportunisticContainerContext();
oppCtx.updateNodeList(getLeastLoadedNodes());
List<Container> oppContainers =
- oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx,
- ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
+ oppContainerAllocator.allocateContainers(
+ request.getResourceBlacklistRequest(),
+ partitionedAsks.getOpportunistic(), appAttemptId, oppCtx,
+ ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
// Create RMContainers and update the NMTokens.
if (!oppContainers.isEmpty()) {
handleNewContainers(oppContainers, false);
appAttempt.updateNMTokens(oppContainers);
+ addToAllocatedContainers(allocateResponse, oppContainers);
}
// Allocate GUARANTEED containers.
request.setAskList(partitionedAsks.getGuaranteed());
- AllocateResponse allocateResp = super.allocate(request);
-
- // Add allocated OPPORTUNISTIC containers to the AllocateResponse.
- if (!oppContainers.isEmpty()) {
- allocateResp.getAllocatedContainers().addAll(oppContainers);
- }
-
- // Update opportunistic container context with the allocated GUARANTEED
- // containers.
- oppCtx.updateCompletedContainers(allocateResp);
-
- // Add all opportunistic containers
- return allocateResp;
+ super.allocateInternal(appAttemptId, request, allocateResponse);
}
@Override
@@ -304,7 +295,7 @@ public class OpportunisticContainerAllocatorAMService
}
private void handleNewContainers(List<Container> allocContainers,
- boolean isRemotelyAllocated) {
+ boolean isRemotelyAllocated) {
for (Container container : allocContainers) {
// Create RMContainer
SchedulerApplicationAttempt appAttempt =
@@ -387,10 +378,12 @@ public class OpportunisticContainerAllocatorAMService
private synchronized List<RemoteNode> getLeastLoadedNodes() {
long currTime = System.currentTimeMillis();
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
- || cachedNodes == null) {
+ || (cachedNodes == null)) {
cachedNodes = convertToRemoteNodes(
this.nodeMonitor.selectLeastLoadedNodes(this.k));
- lastCacheUpdateTime = currTime;
+ if (cachedNodes.size() > 0) {
+ lastCacheUpdateTime = currTime;
+ }
}
return cachedNodes;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index a244ad8..020764b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -25,14 +25,13 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 0cfa1d3..dbc6169 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.RMNodeDecreaseContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
index d7d1e94..80e7c0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
/**
* The event signifying that a container has been reserved.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index feb20ee..30f7ef9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
+
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -965,6 +967,7 @@ public class AppSchedulingInfo {
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
ResourceRequest newRequest = ResourceRequest.newBuilder()
.priority(request.getPriority())
+ .allocationRequestId(request.getAllocationRequestId())
.resourceName(request.getResourceName())
.capability(request.getCapability())
.numContainers(1)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index b3ef471..e94d800 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -73,12 +72,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 6744c2e..759db05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
deleted file mode 100644
index 4b640ae..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-
-/**
- * Composite key for outstanding scheduler requests for any schedulable entity.
- * Currently it includes {@link Priority}.
- */
-public final class SchedulerRequestKey implements
- Comparable<SchedulerRequestKey> {
-
- private final Priority priority;
- private final long allocationRequestId;
-
- /**
- * Factory method to generate a SchedulerRequestKey from a ResourceRequest.
- * @param req ResourceRequest
- * @return SchedulerRequestKey
- */
- public static SchedulerRequestKey create(ResourceRequest req) {
- return new SchedulerRequestKey(req.getPriority(),
- req.getAllocationRequestId());
- }
-
- /**
- * Convenience method to extract the SchedulerRequestKey used to schedule the
- * Container.
- * @param container Container
- * @return SchedulerRequestKey
- */
- public static SchedulerRequestKey extractFrom(Container container) {
- return new SchedulerRequestKey(container.getPriority(),
- container.getAllocationRequestId());
- }
-
- private SchedulerRequestKey(Priority priority, long allocationRequestId) {
- this.priority = priority;
- this.allocationRequestId = allocationRequestId;
- }
-
- /**
- * Get the {@link Priority} of the request.
- *
- * @return the {@link Priority} of the request
- */
- public Priority getPriority() {
- return priority;
- }
-
- /**
- * Get the Id of the associated {@link ResourceRequest}.
- *
- * @return the Id of the associated {@link ResourceRequest}
- */
- public long getAllocationRequestId() {
- return allocationRequestId;
- }
-
- @Override
- public int compareTo(SchedulerRequestKey o) {
- if (o == null) {
- return (priority != null) ? -1 : 0;
- } else {
- if (priority == null) {
- return 1;
- }
- }
- int priorityCompare = o.getPriority().compareTo(priority);
- // we first sort by priority and then by allocationRequestId
- if (priorityCompare != 0) {
- return priorityCompare;
- }
- return Long.compare(allocationRequestId, o.getAllocationRequestId());
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof SchedulerRequestKey)) {
- return false;
- }
-
- SchedulerRequestKey that = (SchedulerRequestKey) o;
-
- if (getAllocationRequestId() != that.getAllocationRequestId()) {
- return false;
- }
- return getPriority() != null ?
- getPriority().equals(that.getPriority()) :
- that.getPriority() == null;
- }
-
- @Override
- public int hashCode() {
- int result = getPriority() != null ? getPriority().hashCode() : 0;
- result = 31 * result + (int) (getAllocationRequestId() ^ (
- getAllocationRequestId() >>> 32));
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 1c6471f..93c0693 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -2204,7 +2205,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
- if (application != null) {
+ if (application != null && rmContainer != null
+ && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
@@ -2222,7 +2224,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
- if (application != null) {
+ if (application != null && rmContainer != null
+ && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
index 74a64c1..0dc527f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 3e8282f..c12bc6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
index 8b4907b..159fb09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
/**
* Contexts for a container inside scheduler
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 498f34f..b14bc20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
@@ -69,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
+
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index d79fcaf..344daf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index dec55ca..fb67270 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -61,9 +60,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
@Override
public int compare(ClusterNode o1, ClusterNode o2) {
if (getMetric(o1) == getMetric(o2)) {
- return o1.timestamp < o2.timestamp ? +1 : -1;
+ return (int)(o2.timestamp - o1.timestamp);
}
- return getMetric(o1) > getMetric(o2) ? +1 : -1;
+ return getMetric(o1) - getMetric(o2);
}
public int getMetric(ClusterNode c) {
@@ -115,8 +114,13 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
ReentrantReadWriteLock.WriteLock writeLock = sortedNodesLock.writeLock();
writeLock.lock();
try {
- sortedNodes.clear();
- sortedNodes.addAll(sortNodes());
+ try {
+ List<NodeId> nodeIds = sortNodes();
+ sortedNodes.clear();
+ sortedNodes.addAll(nodeIds);
+ } catch (Exception ex) {
+ LOG.warn("Got Exception while sorting nodes..", ex);
+ }
if (thresholdCalculator != null) {
thresholdCalculator.update();
}
@@ -273,7 +277,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
List<NodeId> retVal = ((k < this.sortedNodes.size()) && (k >= 0)) ?
new ArrayList<>(this.sortedNodes).subList(0, k) :
new ArrayList<>(this.sortedNodes);
- return Collections.unmodifiableList(retVal);
+ return retVal;
} finally {
readLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 39f4a3d..a9591a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac1e5d4f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index a27a222..d983ea0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import java.util.Collection;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org