You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/11 17:56:26 UTC
[2/4] hadoop git commit: pre-commit
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
new file mode 100644
index 0000000..50d8118
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy
+ .AMRMProxyApplicationContext;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
+
+
+
+import org.apache.hadoop.yarn.server.nodemanager.security
+ .NMTokenSecretManagerInNM;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * <p>The LocalScheduler runs on the NodeManager and is modelled as an
+ * <code>AMRMProxy</code> request interceptor. It is responsible for the
+ * following :
+ * <ul>
+ * <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the
+ * response objects to extract instructions from the
+ * <code>ClusterManager</code> running on the ResourceManager to aid in making
+ * Scheduling scheduling decisions</li>
+ * <li>Call the <code>OpportunisticContainerAllocator</code> to allocate
+ * containers for the opportunistic resource outstandingOpReqs</li>
+ * </ul>
+ * </p>
+ */
+public final class LocalScheduler extends AbstractRequestInterceptor {
+
+ static class PartitionedResourceRequests {
+ private List<ResourceRequest> guaranteed = new ArrayList<>();
+ private List<ResourceRequest> opportunistic = new ArrayList<>();
+ public List<ResourceRequest> getGuaranteed() {
+ return guaranteed;
+ }
+ public List<ResourceRequest> getOpportunistic() {
+ return opportunistic;
+ }
+ }
+
+ static class DistSchedulerParams {
+ Resource maxResource;
+ Resource minResource;
+ Resource incrementResource;
+ int containerTokenExpiryInterval;
+ }
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(LocalScheduler.class);
+
+ // Currently just used to keep track of allocated Containers
+ // Can be used for reporting stats later
+ private Set<ContainerId> containersAllocated = new HashSet<>();
+
+ private DistSchedulerParams appParams = new DistSchedulerParams();
+ private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter =
+ new OpportunisticContainerAllocator.ContainerIdCounter();
+ private Map<String, NodeId> nodeList = new HashMap<>();
+
+ // Mapping of NodeId to NodeTokens. Populated either from RM response or
+ // generated locally if required.
+ private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
+ final Set<String> blacklist = new HashSet<>();
+
+ // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
+ // Resource Name (Host/rack/any) and capability. This mapping is required
+ // to match a received Container to an outstanding OPPORTUNISTIC
+ // ResourceRequests (ask)
+ final TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>
+ outstandingOpReqs = new TreeMap<>();
+
+ private ApplicationAttemptId applicationAttemptId;
+ private OpportunisticContainerAllocator containerAllocator;
+ private NMTokenSecretManagerInNM nmSecretManager;
+ private String appSubmitter;
+
+ public void init(AMRMProxyApplicationContext appContext) {
+ super.init(appContext);
+ initLocal(appContext.getApplicationAttemptId(),
+ appContext.getNMCotext().getContainerAllocator(),
+ appContext.getNMCotext().getNMTokenSecretManager(),
+ appContext.getUser());
+ }
+
+ @VisibleForTesting
+ void initLocal(ApplicationAttemptId applicationAttemptId,
+ OpportunisticContainerAllocator containerAllocator,
+ NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) {
+ this.applicationAttemptId = applicationAttemptId;
+ this.containerAllocator = containerAllocator;
+ this.nmSecretManager = nmSecretManager;
+ this.appSubmitter = appSubmitter;
+ }
+
+ /**
+ * Route register call to the corresponding distributed scheduling method viz.
+ * registerApplicationMasterForDistributedScheduling, and return response to
+ * the caller after stripping away Distributed Scheduling information.
+ *
+ * @param request
+ * registration request
+ * @return
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster
+ (RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return registerApplicationMasterForDistributedScheduling(request)
+ .getRegisterResponse();
+ }
+
+ /**
+ * Route allocate call to the allocateForDistributedScheduling method and
+ * return response to the caller after stripping away Distributed Scheduling
+ * information.
+ *
+ * @param request
+ * allocation request
+ * @return
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Override
+ public AllocateResponse allocate(AllocateRequest request) throws
+ YarnException, IOException {
+ return allocateForDistributedScheduling(request).getAllocateResponse();
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster
+ (FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return getNextInterceptor().finishApplicationMaster(request);
+ }
+
+ /**
+ * Check if we already have a NMToken. if Not, generate the Token and
+ * add it to the response
+ * @param response
+ * @param nmTokens
+ * @param allocatedContainers
+ */
+ private void updateResponseWithNMTokens(AllocateResponse response,
+ List<NMToken> nmTokens, List<Container> allocatedContainers) {
+ List<NMToken> newTokens = new ArrayList<>();
+ if (allocatedContainers.size() > 0) {
+ response.getAllocatedContainers().addAll(allocatedContainers);
+ for (Container alloc : allocatedContainers) {
+ if (!nodeTokens.containsKey(alloc.getNodeId())) {
+ newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
+ }
+ }
+ List<NMToken> retTokens = new ArrayList<>(nmTokens);
+ retTokens.addAll(newTokens);
+ response.setNMTokens(retTokens);
+ }
+ }
+
+ private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
+ askList) {
+ PartitionedResourceRequests partitionedRequests =
+ new PartitionedResourceRequests();
+ for (ResourceRequest rr : askList) {
+ if (rr.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ partitionedRequests.getOpportunistic().add(rr);
+ } else {
+ partitionedRequests.getGuaranteed().add(rr);
+ }
+ }
+ return partitionedRequests;
+ }
+
+ private void updateParameters(
+ DistSchedRegisterResponse registerResponse) {
+ appParams.minResource = registerResponse.getMinAllocatableCapabilty();
+ appParams.maxResource = registerResponse.getMaxAllocatableCapabilty();
+ appParams.incrementResource =
+ registerResponse.getIncrAllocatableCapabilty();
+ if (appParams.incrementResource == null) {
+ appParams.incrementResource = appParams.minResource;
+ }
+ appParams.containerTokenExpiryInterval = registerResponse
+ .getContainerTokenExpiryInterval();
+
+ containerIdCounter
+ .resetContainerIdCounter(registerResponse.getContainerIdStart());
+ setNodeList(registerResponse.getNodesForScheduling());
+ }
+
+ /**
+ * Takes a list of ResourceRequests (asks), extracts the key information viz.
+ * (Priority, ResourceName, Capability) and adds it the outstanding
+ * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
+ * the current YARN constraint that only a single ResourceRequest can exist at
+ * a give Priority and Capability
+ * @param resourceAsks
+ */
+ public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
+ for (ResourceRequest request : resourceAsks) {
+ // Handling locality for opportunistic tokens is optional; we rely on
+ // "anyAsk" to drive allocations
+ Priority priority = request.getPriority();
+ String resourceName = request.getResourceName();
+
+ if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
+ continue;
+ }
+
+ if (request.getNumContainers() == 0) {
+ continue;
+ }
+
+ Map<String, Map<Resource, ResourceRequest>> asks =
+ this.outstandingOpReqs.get(priority);
+ if (asks == null) {
+ asks = new HashMap<>();
+ this.outstandingOpReqs.put(priority, asks);
+ }
+
+ Map<Resource, ResourceRequest> reqMap = asks.get(resourceName);
+ if (reqMap == null) {
+ reqMap = new HashMap<>();
+ asks.put(resourceName, reqMap);
+ }
+
+ ResourceRequest resourceRequest = reqMap.get(request.getCapability());
+ if (resourceRequest == null) {
+ resourceRequest = request;
+ reqMap.put(request.getCapability(), request);
+ } else {
+ resourceRequest.setNumContainers(
+ resourceRequest.getNumContainers() + request.getNumContainers());
+ }
+ if (ResourceRequest.isAnyLocation(request.getResourceName())) {
+ LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
+ + ", with capability = " + request.getCapability() + " ) : "
+ + resourceRequest.getNumContainers());
+ }
+ }
+ }
+
+ /**
+ * This method matches a returned list of Container Allocations to any
+ * outstanding OPPORTUNISTIC ResourceRequest
+ * @param capability
+ * @param allocatedContainers
+ */
+ public void matchAllocationToOutstandingRequest(Resource capability,
+ List<Container> allocatedContainers) {
+ for (Container c : allocatedContainers) {
+ containersAllocated.add(c.getId());
+ Map<String, Map<Resource, ResourceRequest>> asks = this
+ .outstandingOpReqs.get(c.getPriority());
+
+ if (asks == null)
+ continue;
+
+ // Host specific accounting
+ removeFromReqMap(capability, asks.get(c.getNodeId().getHost()));
+
+ // any ask accounting
+ removeFromReqMap(capability, asks.get(ResourceRequest.ANY));
+ }
+ }
+
+ private void removeFromReqMap(Resource capability, Map<Resource,
+ ResourceRequest> rrMap) {
+ if (rrMap != null) {
+ ResourceRequest rr = rrMap.get(capability);
+ if (rr != null) {
+ rr.setNumContainers(rr.getNumContainers() - 1);
+ if (rr.getNumContainers() == 0)
+ rrMap.remove(capability);
+ }
+ }
+ }
+
+ private void setNodeList(List<NodeId> nodeList) {
+ this.nodeList.clear();
+ addToNodeList(nodeList);
+ }
+
+ private void addToNodeList(List<NodeId> nodes) {
+ for (NodeId n : nodes) {
+ this.nodeList.put(n.getHost(), n);
+ }
+ }
+
+ @Override
+ public DistSchedRegisterResponse
+ registerApplicationMasterForDistributedScheduling
+ (RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ LOG.info("Forwarding registration request to the" +
+ "Distributed Scheduler Service on YARN RM");
+ DistSchedRegisterResponse dsResp = getNextInterceptor()
+ .registerApplicationMasterForDistributedScheduling(request);
+ updateParameters(dsResp);
+ return dsResp;
+ }
+
+ @Override
+ public DistSchedAllocateResponse allocateForDistributedScheduling
+ (AllocateRequest request) throws YarnException, IOException {
+ LOG.info("Forwarding allocate request to the" +
+ "Distributed Scheduler Service on YARN RM");
+ // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
+ PartitionedResourceRequests partitionedAsks = partitionAskList(request
+ .getAskList());
+
+ List<ContainerId> releasedContainers = request.getReleaseList();
+ int numReleasedContainers = releasedContainers.size();
+ if (numReleasedContainers > 0) {
+ LOG.info("AttemptID: " + applicationAttemptId + " released: "
+ + numReleasedContainers);
+ containersAllocated.removeAll(releasedContainers);
+ }
+
+ // Also, update black list
+ ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
+ if (rbr != null) {
+ blacklist.removeAll(rbr.getBlacklistRemovals());
+ blacklist.addAll(rbr.getBlacklistAdditions());
+ }
+
+ // Add OPPORTUNISTIC reqs to the outstanding reqs
+ addToOutstandingReqs(partitionedAsks.getOpportunistic());
+
+ List<Container> allocatedContainers = new ArrayList<>();
+ for (Priority priority : outstandingOpReqs.descendingKeySet()) {
+ for (Map<Resource, ResourceRequest> reqMap :
+ outstandingOpReqs.get(priority).values()) {
+ // Allocated containers :
+ // Key = Requested Capability,
+ // Value = List of Containers of given Cap (The actual container size
+ // might be different than what is requested.. which is why
+ // we need the requested capability (key) to match against
+ // the outstanding reqs)
+ Map<Resource, List<Container>> allocated =
+ containerAllocator.allocate(this.appParams, containerIdCounter,
+ reqMap.values(), blacklist, applicationAttemptId, nodeList,
+ appSubmitter);
+ for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
+ matchAllocationToOutstandingRequest(e.getKey(), e.getValue());
+ allocatedContainers.addAll(e.getValue());
+ }
+ }
+ }
+
+ // Send all the GUARANTEED Reqs to RM
+ request.setAskList(partitionedAsks.getGuaranteed());
+ DistSchedAllocateResponse dsResp =
+ getNextInterceptor().allocateForDistributedScheduling(request);
+
+ // Update host to nodeId mapping
+ setNodeList(dsResp.getNodesForScheduling());
+ List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
+ for (NMToken nmToken : nmTokens) {
+ nodeTokens.put(nmToken.getNodeId(), nmToken);
+ }
+
+ List<ContainerStatus> completedContainers =
+ dsResp.getAllocateResponse().getCompletedContainersStatuses();
+
+ // Only account for opportunistic containers
+ for (ContainerStatus cs : completedContainers) {
+ if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ containersAllocated.remove(cs.getContainerId());
+ }
+ }
+
+ // Check if we have NM tokens for all the allocated containers. If not
+ // generate one and update the response.
+ updateResponseWithNMTokens(
+ dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Number of opportunistic containers currently allocated by" +
+ "application: " + containersAllocated.size());
+ }
+ return dsResp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
new file mode 100644
index 0000000..1fd6529
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * <p>The OpportunisticContainerAllocator allocates containers on a given list
+ * of Nodes after it modifies the container sizes to within allowable limits
+ * specified by the <code>ClusterManager</code> running on the RM. It tries to
+ * distribute the containers as evenly as possible. It also uses the
+ * <code>NMTokenSecretManagerInNM</code> to generate the required NM tokens for
+ * the allocated containers</p>
+ */
+public class OpportunisticContainerAllocator {
+
+ private static final Log LOG =
+ LogFactory.getLog(OpportunisticContainerAllocator.class);
+
+ private static final ResourceCalculator RESOURCE_CALCULATOR =
+ new DominantResourceCalculator();
+
+ static class ContainerIdCounter {
+ final AtomicLong containerIdCounter = new AtomicLong(1);
+
+ void resetContainerIdCounter(long containerIdStart) {
+ this.containerIdCounter.set(containerIdStart);
+ }
+
+ long generateContainerId() {
+ return this.containerIdCounter.decrementAndGet();
+ }
+ }
+
+ private final NodeStatusUpdater nodeStatusUpdater;
+ private final Context context;
+ private int webpagePort;
+
+ public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater,
+ Context context, int webpagePort) {
+ this.nodeStatusUpdater = nodeStatusUpdater;
+ this.context = context;
+ this.webpagePort = webpagePort;
+ }
+
+ public Map<Resource, List<Container>> allocate(DistSchedulerParams appParams,
+ ContainerIdCounter idCounter, Collection<ResourceRequest> resourceAsks,
+ Set<String> blacklist, ApplicationAttemptId appAttId,
+ Map<String, NodeId> allNodes, String userName) throws YarnException {
+ Map<Resource, List<Container>> containers = new HashMap<>();
+ Set<String> nodesAllocated = new HashSet<>();
+ List<ResourceRequest> anyAsks = new ArrayList<>(resourceAsks);
+ for (ResourceRequest anyAsk : anyAsks) {
+ allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
+ allNodes, userName, containers, nodesAllocated, anyAsk);
+ }
+ if (resourceAsks.size() > 0) {
+ LOG.info("Opportunistic allocation requested for: " + resourceAsks.size()
+ + " containers; allocated = " + containers.size());
+ }
+ return containers;
+ }
+
+ private void allocateOpportunisticContainers(DistSchedulerParams appParams,
+ ContainerIdCounter idCounter, Set<String> blacklist,
+ ApplicationAttemptId id, Map<String, NodeId> allNodes, String userName,
+ Map<Resource, List<Container>> containers, Set<String> nodesAllocated,
+ ResourceRequest anyAsk) throws YarnException {
+ int toAllocate = anyAsk.getNumContainers()
+ - (containers.isEmpty() ?
+ 0 : containers.get(anyAsk.getCapability()).size());
+
+ List<String> topKNodesLeft = new ArrayList<>();
+ for (String s : allNodes.keySet()) {
+ // Bias away from whatever we have already allocated and respect blacklist
+ if (nodesAllocated.contains(s) || blacklist.contains(s))
+ continue;
+ topKNodesLeft.add(s);
+ }
+ int numAllocated = 0;
+ int nextNodeToAllocate = 0;
+ for (int numCont = 0; numCont < toAllocate; numCont++) {
+ String topNode = topKNodesLeft.get(nextNodeToAllocate);
+ nextNodeToAllocate++;
+ nextNodeToAllocate %= topKNodesLeft.size();
+ NodeId nodeId = allNodes.get(topNode);
+ Container container = buildContainer(appParams, idCounter, anyAsk, id,
+ userName, nodeId);
+ List<Container> cList = containers.get(anyAsk.getCapability());
+ if (cList == null) {
+ cList = new ArrayList<>();
+ containers.put(anyAsk.getCapability(), cList);
+ }
+ cList.add(container);
+ numAllocated++;
+ LOG.info("Allocated " + numAllocated + " opportunistic containers.");
+ }
+ }
+
+ private Container buildContainer(DistSchedulerParams appParams,
+ ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
+ String userName, NodeId nodeId) throws YarnException {
+ ContainerId cId =
+ ContainerId.newContainerId(id, idCounter.generateContainerId());
+
+ // Normalize the resource asks (Similar to what the the RM scheduler does
+ // before accepting an ask)
+ Resource capability = normalizeCapability(appParams, rr);
+
+ long currTime = System.currentTimeMillis();
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(
+ cId, nodeId.getHost(), userName, capability,
+ currTime + appParams.containerTokenExpiryInterval,
+ context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
+ nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
+ null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
+ ExecutionType.OPPORTUNISTIC);
+ byte[] pwd =
+ context.getContainerTokenSecretManager().createPassword(
+ containerTokenIdentifier);
+ Token containerToken = newContainerToken(nodeId, pwd,
+ containerTokenIdentifier);
+ Container container = BuilderUtils.newContainer(
+ cId, nodeId, nodeId.getHost() + ":" + webpagePort,
+ capability, rr.getPriority(), containerToken);
+ return container;
+ }
+
+ private Resource normalizeCapability(DistSchedulerParams appParams,
+ ResourceRequest ask) {
+ return Resources.normalize(RESOURCE_CALCULATOR,
+ ask.getCapability(), appParams.minResource, appParams.maxResource,
+ appParams.incrementResource);
+ }
+
+ public static Token newContainerToken(NodeId nodeId, byte[] password,
+ ContainerTokenIdentifier tokenIdentifier) {
+ // RPC layer client expects ip:port as service for tokens
+ InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
+ nodeId.getPort());
+ // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+ Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
+ ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+ .buildTokenService(addr).toString());
+ return containerToken;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
index f6169e7..86cce35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
@@ -29,7 +29,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
@@ -50,7 +53,7 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys;
private final Map<ApplicationId, List<ApplicationAttemptId>> appToAppAttemptMap;
private final NMStateStoreService stateStore;
- private NodeId nodeId;
+ private NodeId nodeId;
public NMTokenSecretManagerInNM() {
this(new NMNullStateStoreService());
@@ -276,4 +279,23 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
LOG.error("Unable to remove master key for application " + attempt, e);
}
}
+
+ /**
+ * Used by the Distributed Scheduler framework to generate NMTokens
+ * @param applicationSubmitter
+ * @param container
+ * @return NMToken
+ */
+ public NMToken generateNMToken(
+ String applicationSubmitter, Container container) {
+ this.readLock.lock();
+ try {
+ Token token =
+ createNMToken(container.getId().getApplicationAttemptId(),
+ container.getNodeId(), applicationSubmitter);
+ return NMToken.newInstance(container.getNodeId(), token);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
index 3dc62bc..6885804 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
@@ -80,7 +80,7 @@ public class TestEventFlow {
Context context = new NMContext(new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM(), null, null,
- new NMNullStateStoreService()) {
+ new NMNullStateStoreService(), false) {
@Override
public int getHttpPort() {
return 1234;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 0d85057..ae9eba2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -1576,7 +1576,7 @@ public class TestNodeStatusUpdater {
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
- NMStateStoreService store) {
+ NMStateStoreService store, boolean isDistributedSchedulingEnabled) {
return new MyNMContext(containerTokenSecretManager,
nmTokenSecretManager);
}
@@ -1811,7 +1811,7 @@ public class TestNodeStatusUpdater {
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
super(containerTokenSecretManager, nmTokenSecretManager, null, null,
- new NMNullStateStoreService());
+ new NMNullStateStoreService(), false);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 9bc23f6..e1ffd88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -678,5 +679,14 @@ public abstract class BaseAMRMProxyTest {
return null;
}
+ @Override
+ public boolean isDistributedSchedulingEnabled() {
+ return false;
+ }
+
+ @Override
+ public OpportunisticContainerAllocator getContainerAllocator() {
+ return null;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index c902fd5..d070bbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -110,7 +110,7 @@ public abstract class BaseContainerManagerTest {
protected Configuration conf = new YarnConfiguration();
protected Context context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
- new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
+ new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
public int getHttpPort() {
return HTTP_PORT;
};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 2e014de..dfb7a1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -471,7 +471,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
NMStateStoreService stateStore) {
NMContext context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
- new ApplicationACLsManager(conf), stateStore){
+ new ApplicationACLsManager(conf), stateStore, false){
public int getHttpPort() {
return HTTP_PORT;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
index 1169c68..cf7ca8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
@@ -113,7 +113,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
private static final String INVALID_JAVA_HOME = "/no/jvm/here";
protected Context distContext = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
- new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
+ new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
public int getHttpPort() {
return HTTP_PORT;
};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
index 9e08b7f..c768df1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
@@ -81,7 +81,8 @@ public class TestLocalCacheDirectoryManager {
NMContext nmContext =
new NMContext(new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM(), null,
- new ApplicationACLsManager(conf), new NMNullStateStoreService());
+ new ApplicationACLsManager(conf), new NMNullStateStoreService(),
+ false);
ResourceLocalizationService service =
new ResourceLocalizationService(null, null, null, null, nmContext);
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 596f784..5fc71c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -186,7 +186,7 @@ public class TestResourceLocalizationService {
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
nmContext = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
- new ApplicationACLsManager(conf), new NMNullStateStoreService());
+ new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
}
@After
@@ -2365,7 +2365,7 @@ public class TestResourceLocalizationService {
NMContext nmContext =
new NMContext(new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM(), null,
- new ApplicationACLsManager(conf), stateStore);
+ new ApplicationACLsManager(conf), stateStore, false);
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, nmContext);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
new file mode 100644
index 0000000..efc682a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
+import org.apache.hadoop.yarn.server.nodemanager.security
+ .NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security
+ .NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestLocalScheduler {
+
+ @Test
+ public void testLocalScheduler() throws Exception {
+
+ Configuration conf = new Configuration();
+ LocalScheduler localScheduler = new LocalScheduler();
+
+ NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
+ Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
+ Context context = Mockito.mock(Context.class);
+ NMContainerTokenSecretManager nmContainerTokenSecretManager = new
+ NMContainerTokenSecretManager(conf);
+ MasterKey mKey = new MasterKey() {
+ @Override
+ public int getKeyId() {
+ return 1;
+ }
+ @Override
+ public void setKeyId(int keyId) {}
+ @Override
+ public ByteBuffer getBytes() {
+ return ByteBuffer.allocate(8);
+ }
+ @Override
+ public void setBytes(ByteBuffer bytes) {}
+ };
+ nmContainerTokenSecretManager.setMasterKey(mKey);
+ Mockito.when(context.getContainerTokenSecretManager()).thenReturn
+ (nmContainerTokenSecretManager);
+ OpportunisticContainerAllocator containerAllocator =
+ new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777);
+
+ NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
+ new NMTokenSecretManagerInNM();
+ nmTokenSecretManagerInNM.setMasterKey(mKey);
+ localScheduler.initLocal(
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
+ containerAllocator, nmTokenSecretManagerInNM, "test");
+
+ RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
+ localScheduler.setNextInterceptor(finalReqIntcptr);
+
+ DistSchedRegisterResponse distSchedRegisterResponse =
+ Records.newRecord(DistSchedRegisterResponse.class);
+ distSchedRegisterResponse.setRegisterResponse(
+ Records.newRecord(RegisterApplicationMasterResponse.class));
+ distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
+ distSchedRegisterResponse.setContainerIdStart(0);
+ distSchedRegisterResponse.setMaxAllocatableCapabilty(
+ Resource.newInstance(1024, 4));
+ distSchedRegisterResponse.setMinAllocatableCapabilty(
+ Resource.newInstance(512, 2));
+ distSchedRegisterResponse.setNodesForScheduling(Arrays.asList(
+ NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
+ Mockito.when(
+ finalReqIntcptr.registerApplicationMasterForDistributedScheduling(
+ Mockito.any(RegisterApplicationMasterRequest.class)))
+ .thenReturn(distSchedRegisterResponse);
+
+ localScheduler.registerApplicationMaster(
+ Records.newRecord(RegisterApplicationMasterRequest.class));
+
+ Mockito.when(
+ finalReqIntcptr.allocateForDistributedScheduling(
+ Mockito.any(AllocateRequest.class)))
+ .thenAnswer(new Answer<DistSchedAllocateResponse>() {
+ @Override
+ public DistSchedAllocateResponse answer(InvocationOnMock
+ invocationOnMock) throws Throwable {
+ return createAllocateResponse(Arrays.asList(
+ NodeId.newInstance("c", 3), NodeId.newInstance("d", 4)));
+ }
+ });
+
+ AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+ ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class);
+ guaranteedReq.setExecutionType(ExecutionType.GUARANTEED);
+ guaranteedReq.setNumContainers(5);
+ guaranteedReq.setCapability(Resource.newInstance(2048, 2));
+ guaranteedReq.setRelaxLocality(true);
+ guaranteedReq.setResourceName("*");
+ ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class);
+ opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
+ opportunisticReq.setNumContainers(4);
+ opportunisticReq.setCapability(Resource.newInstance(1024, 4));
+ opportunisticReq.setPriority(Priority.newInstance(100));
+ opportunisticReq.setRelaxLocality(true);
+ opportunisticReq.setResourceName("*");
+ allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
+
+ // Verify 4 containers were allocated
+ AllocateResponse allocateResponse = localScheduler.allocate(allocateRequest);
+ Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
+
+ // Verify equal distribution on hosts a and b
+ // And None on c and d
+ Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse);
+ Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
+ Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
+ Assert.assertNull(allocs.get(NodeId.newInstance("c", 3)));
+ Assert.assertNull(allocs.get(NodeId.newInstance("d", 4)));
+
+ // New Allocate request
+ allocateRequest = Records.newRecord(AllocateRequest.class);
+ opportunisticReq = Records.newRecord(ResourceRequest.class);
+ opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
+ opportunisticReq.setNumContainers(6);
+ opportunisticReq.setCapability(Resource.newInstance(512, 3));
+ opportunisticReq.setPriority(Priority.newInstance(100));
+ opportunisticReq.setRelaxLocality(true);
+ opportunisticReq.setResourceName("*");
+ allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
+
+ // Verify 6 containers were allocated
+ allocateResponse = localScheduler.allocate(allocateRequest);
+ Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
+
+ // Verify New containers are equally distribution on hosts c and d
+ // And None on a and b
+ allocs = mapAllocs(allocateResponse);
+ Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
+ Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
+ Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
+ Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
+ }
+
+ private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) {
+ DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord
+ (DistSchedAllocateResponse.class);
+ distSchedAllocateResponse.setAllocateResponse(
+ Records.newRecord(AllocateResponse.class));
+ distSchedAllocateResponse.setNodesForScheduling(nodes);
+ return distSchedAllocateResponse;
+ }
+
+ private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
+ allocateResponse) {
+ Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
+ for (Container c : allocateResponse.getAllocatedContainers()) {
+ List<ContainerId> cIds = allocs.get(c.getNodeId());
+ if (cIds == null) {
+ cIds = new ArrayList<>();
+ allocs.put(c.getNodeId(), cIds);
+ }
+ cIds.add(c.getId());
+ }
+ return allocs;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
index 84e42fc..6a72cc0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
@@ -96,7 +96,7 @@ public class TestContainerLogsPage {
healthChecker.init(conf);
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
- new ApplicationACLsManager(conf), new NMNullStateStoreService());
+ new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
// Add an application and the corresponding containers
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf);
String user = "nobody";
@@ -136,7 +136,7 @@ public class TestContainerLogsPage {
when(dirsHandlerForFullDisk.getLogDirsForRead()).
thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()}));
nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk,
- new ApplicationACLsManager(conf), new NMNullStateStoreService());
+ new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
nmContext.getApplications().put(appId, app);
container.setState(ContainerState.RUNNING);
nmContext.getContainers().put(container1, container);
@@ -158,7 +158,7 @@ public class TestContainerLogsPage {
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
- new ApplicationACLsManager(conf), new NMNullStateStoreService());
+ new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
// Add an application and the corresponding containers
String user = "nobody";
long clusterTimeStamp = 1234;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
index e1845c7..39e8394 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
@@ -87,7 +87,7 @@ public class TestNMWebServer {
private int startNMWebAppServer(String webAddr) {
Context nmContext = new NodeManager.NMContext(null, null, null, null,
- null);
+ null, false);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@@ -150,7 +150,7 @@ public class TestNMWebServer {
@Test
public void testNMWebApp() throws IOException, YarnException {
Context nmContext = new NodeManager.NMContext(null, null, null, null,
- null);
+ null, false);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index 1f5590c..2ac0956 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -111,7 +111,7 @@ public class TestNMWebServices extends JerseyTestBase {
healthChecker.init(conf);
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
- aclsManager, null);
+ aclsManager, null, false);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
index e274abb..dfbcf06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
@@ -104,7 +104,7 @@ public class TestNMWebServicesApps extends JerseyTestBase {
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
- aclsManager, null);
+ aclsManager, null, false);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
index 0ed56d3..efad825 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
@@ -132,7 +132,7 @@ public class TestNMWebServicesContainers extends JerseyTestBase {
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
- aclsManager, null) {
+ aclsManager, null, false) {
public NodeId getNodeId() {
return NodeId.newInstance("testhost.foo.com", 8042);
};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index ab94175..4f90fa0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -89,6 +91,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security
+ .AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -104,21 +108,27 @@ public class ApplicationMasterService extends AbstractService implements
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
private final AMLivelinessMonitor amLivelinessMonitor;
private YarnScheduler rScheduler;
- private InetSocketAddress masterServiceAddress;
- private Server server;
- private final RecordFactory recordFactory =
+ protected InetSocketAddress masterServiceAddress;
+ protected Server server;
+ protected final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
- private final RMContext rmContext;
+ protected final RMContext rmContext;
- public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
- super(ApplicationMasterService.class.getName());
+ public ApplicationMasterService(String name, RMContext rmContext,
+ YarnScheduler scheduler) {
+ super(name);
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
this.rmContext = rmContext;
}
+ public ApplicationMasterService(RMContext rmContext,
+ YarnScheduler scheduler) {
+ this(ApplicationMasterService.class.getName(), rmContext, scheduler);
+ }
+
@Override
protected void serviceInit(Configuration conf) throws Exception {
masterServiceAddress = conf.getSocketAddr(
@@ -139,11 +149,8 @@ public class ApplicationMasterService extends AbstractService implements
serverConf.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
SaslRpcServer.AuthMethod.TOKEN.toString());
- this.server =
- rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress,
- serverConf, this.rmContext.getAMRMTokenSecretManager(),
- serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+ this.server = getServer(rpc, serverConf, masterServiceAddress,
+ this.rmContext.getAMRMTokenSecretManager());
// Enable service authorization?
if (conf.getBoolean(
@@ -158,7 +165,7 @@ public class ApplicationMasterService extends AbstractService implements
}
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
}
-
+
this.server.start();
this.masterServiceAddress =
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
@@ -168,6 +175,14 @@ public class ApplicationMasterService extends AbstractService implements
super.serviceStart();
}
+ protected Server getServer(YarnRPC rpc, Configuration serverConf,
+ InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+ return rpc.getServer(ApplicationMasterProtocol.class, this, addr,
+ serverConf, secretManager,
+ serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+ }
+
@Private
public InetSocketAddress getBindAddress() {
return this.masterServiceAddress;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
new file mode 100644
index 0000000..5210f7f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security
+ .AMRMTokenSecretManager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+
+public class DistributedSchedulingService extends ApplicationMasterService
+ implements DistributedSchedulerProtocol {
+
+ public DistributedSchedulingService(RMContext rmContext,
+ YarnScheduler scheduler) {
+ super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
+ }
+
+ @Override
+ public Server getServer(YarnRPC rpc, Configuration serverConf,
+ InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+ Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
+ addr, serverConf, secretManager,
+ serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+ // To support application running no NMs that DO NOT support
+ // Dist Scheduling...
+ ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ ApplicationMasterProtocolPB.class,
+ ApplicationMasterProtocolService.newReflectiveBlockingService(
+ new ApplicationMasterProtocolPBServiceImpl(this)));
+ return server;
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster
+ (RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return super.registerApplicationMaster(request);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster
+ (FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return super.finishApplicationMaster(request);
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request) throws
+ YarnException, IOException {
+ return super.allocate(request);
+ }
+
+ @Override
+ public DistSchedRegisterResponse
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ RegisterApplicationMasterResponse response =
+ registerApplicationMaster(request);
+ DistSchedRegisterResponse dsResp = recordFactory
+ .newRecordInstance(DistSchedRegisterResponse.class);
+ dsResp.setRegisterResponse(response);
+ dsResp.setMinAllocatableCapabilty(
+ Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
+ YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
+ )
+ );
+ dsResp.setMaxAllocatableCapabilty(
+ Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
+ YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
+ )
+ );
+ dsResp.setIncrAllocatableCapabilty(
+ Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
+ YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
+ )
+ );
+ dsResp.setContainerTokenExpiryInterval(
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
+ YarnConfiguration.
+ DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
+ dsResp.setContainerIdStart(
+ this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
+
+ // Set nodes to be used for scheduling
+ // TODO: The actual computation of the list will happen in YARN-4412
+ // TODO: Till then, send the complete list
+ dsResp.setNodesForScheduling(
+ new ArrayList<>(this.rmContext.getRMNodes().keySet()));
+ return dsResp;
+ }
+
+ @Override
+ public DistSchedAllocateResponse allocateForDistributedScheduling
+ (AllocateRequest request) throws YarnException, IOException {
+ AllocateResponse response = allocate(request);
+ DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
+ (DistSchedAllocateResponse.class);
+ dsResp.setAllocateResponse(response);
+ dsResp.setNodesForScheduling(
+ new ArrayList<>(this.rmContext.getRMNodes().keySet()));
+ return dsResp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index b2950bb..b51f00d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -134,6 +134,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+ /**
+ * Used for generation of various ids.
+ */
+ public static final int EPOCH_BIT_SHIFT = 40;
+
private static final Log LOG = LogFactory.getLog(ResourceManager.class);
private static long clusterTimeStamp = System.currentTimeMillis();
@@ -1222,6 +1227,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected ApplicationMasterService createApplicationMasterService() {
+ if (this.rmContext.getYarnConfiguration().getBoolean(
+ YarnConfiguration.DIST_SCHEDULING_ENABLED,
+ YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
+ return new DistributedSchedulingService(this.rmContext, scheduler);
+ }
return new ApplicationMasterService(this.rmContext, scheduler);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index a61001e..568fd4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -91,7 +91,8 @@ public class AppSchedulingInfo {
this.queue = queue;
this.user = user;
this.activeUsersManager = activeUsersManager;
- this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
+ this.containerIdCounter =
+ new AtomicLong(epoch << EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index d5b64c1..6182b07 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -93,6 +93,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+
+
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.log4j.Level;
@@ -737,6 +739,21 @@ public class MockRM extends ResourceManager {
@Override
protected ApplicationMasterService createApplicationMasterService() {
+ if (this.rmContext.getYarnConfiguration().getBoolean(
+ YarnConfiguration.DIST_SCHEDULING_ENABLED,
+ YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
+ return new DistributedSchedulingService(getRMContext(), scheduler) {
+ @Override
+ protected void serviceStart() {
+ // override to not start rpc handler
+ }
+
+ @Override
+ protected void serviceStop() {
+ // don't do anything
+ }
+ };
+ }
return new ApplicationMasterService(getRMContext(), scheduler) {
@Override
protected void serviceStart() {