You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [17/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMContainerRequestor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMContainerRequestor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMContainerRequestor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,556 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.client.ClientService;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
+import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+
+/**
+ * Keeps the data structures to send container requests to RM.
+ */
+// TODO XXX: Eventually rename to RMCommunicator
+public class RMContainerRequestor extends RMCommunicator implements ContainerRequestor {
+
+ private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
+ static final String ANY = "*";
+
+ private final Clock clock;
+
+ private Resource availableResources; // aka headroom.
+ private long retrystartTime;
+ private long retryInterval;
+
+ private int numContainerReleaseRequests;
+ private int numContainersAllocated;
+ private int numFinishedContainers; // Not very useful.
+
+
+ //Key -> Priority
+ //Value -> Map
+ // Key->ResourceName (e.g., hostname, rackname, *)
+ // Value->Map
+ // Key->Resource Capability
+ // Value->ResourceRequest
+ private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+ remoteRequestsTable =
+ new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+ private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
+ private final Set<ContainerId> release = new TreeSet<ContainerId>();
+
+ private Lock releaseLock = new ReentrantLock();
+ private Lock askLock = new ReentrantLock();
+ private final List<ContainerId> emptyReleaseList = new ArrayList<ContainerId>(0);
+ private final List<ResourceRequest> emptyAskList = new ArrayList<ResourceRequest>();
+
+ private int clusterNmCount = 0;
+
+ // TODO Consider allowing sync comm between the requestor and allocator...
+
+ // TODO Why does the RMRequestor require the ClientService ??
+ // (for the RPC address. get rid of this.)
+ public RMContainerRequestor(ClientService clientService, AppContext context) {
+ super(clientService, context);
+ this.clock = context.getClock();
+ }
+
+ public static class ContainerRequest {
+ final Resource capability;
+ final String[] hosts;
+ final String[] racks;
+ final Priority priority;
+
+ public ContainerRequest(Resource capability, String[] hosts,
+ String[] racks, Priority priority) {
+ this.capability = capability;
+ this.hosts = hosts;
+ this.racks = racks;
+ this.priority = priority;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Capability[").append(capability).append("]");
+ sb.append("Priority[").append(priority).append("]");
+ return sb.toString();
+ }
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ retrystartTime = clock.getTime();
+ retryInterval = getConfig().getLong(
+ MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+ MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+ }
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ return this.applicationACLs;
+ }
+
+ public void stop(Configuration conf) {
+ LOG.info("NumAllocatedContainers: " + numContainersAllocated
+ + "NumFinihsedContainers: " + numFinishedContainers
+ + "NumReleaseRequests: " + numContainerReleaseRequests);
+ super.stop();
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ return availableResources;
+ }
+
+ public void addContainerReq(ContainerRequest req) {
+ // Create resource requests
+ for (String host : req.hosts) {
+ // Data-local
+ // Assumes the scheduler is handling bad nodes. Tracking them here would
+ // lead to an out-of-sync scheduler / requestor.
+ addResourceRequest(req.priority, host, req.capability);
+ }
+
+ // Nothing Rack-local for now
+ for (String rack : req.racks) {
+ addResourceRequest(req.priority, rack, req.capability);
+ }
+
+ // Off-switch
+ addResourceRequest(req.priority, ANY, req.capability);
+ }
+
+ public void decContainerReq(ContainerRequest req) {
+ // Update resource requests
+ for (String hostName : req.hosts) {
+ decResourceRequest(req.priority, hostName, req.capability);
+ }
+
+ for (String rack : req.racks) {
+ decResourceRequest(req.priority, rack, req.capability);
+ }
+
+ decResourceRequest(req.priority, ANY, req.capability);
+ }
+
+ private void addResourceRequest(Priority priority, String resourceName,
+ Resource capability) {
+ addResourceRequest(priority, resourceName, capability, 1);
+ }
+
+ private void addResourceRequest(Priority priority, String resourceName,
+ Resource capability, int increment) {
+ Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ if (remoteRequests == null) {
+ remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+ this.remoteRequestsTable.put(priority, remoteRequests);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added priority=" + priority);
+ }
+ }
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ reqMap = new HashMap<Resource, ResourceRequest>();
+ remoteRequests.put(resourceName, reqMap);
+ }
+ ResourceRequest remoteRequest = reqMap.get(capability);
+ if (remoteRequest == null) {
+ remoteRequest = Records.newRecord(ResourceRequest.class);
+ remoteRequest.setPriority(priority);
+ remoteRequest.setHostName(resourceName);
+ remoteRequest.setCapability(capability);
+ remoteRequest.setNumContainers(0);
+ reqMap.put(capability, remoteRequest);
+ }
+ remoteRequest.setNumContainers(remoteRequest.getNumContainers() + increment);
+ // 0 is a special case to re-add the request to the ask table.
+
+ // Note this down for next interaction with ResourceManager
+ int askSize = 0;
+ askLock.lock();
+ try {
+ ask.add(remoteRequest);
+ askSize = ask.size();
+ } finally {
+ askLock.unlock();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addResourceRequest:" + " applicationId="
+ + applicationId.getId() + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + askSize);
+ }
+ }
+
+ private void decResourceRequest(Priority priority, String resourceName,
+ Resource capability) {
+ Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ // as we modify the resource requests by filtering out blacklisted hosts
+ // when they are added, this value may be null when being
+ // decremented
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not decrementing resource as " + resourceName
+ + " is not present in request table");
+ }
+ return;
+ }
+ ResourceRequest remoteRequest = reqMap.get(capability);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ + applicationId.getId() + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + getAskSize());
+ }
+
+ remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
+ if (remoteRequest.getNumContainers() == 0) {
+ reqMap.remove(capability);
+ if (reqMap.size() == 0) {
+ remoteRequests.remove(resourceName);
+ }
+ if (remoteRequests.size() == 0) {
+ remoteRequestsTable.remove(priority);
+ }
+ //remove from ask if it may have
+ askLock.lock();
+ try {
+ ask.remove(remoteRequest);
+ } finally {
+ askLock.unlock();
+ }
+ } else {
+ askLock.lock();
+ try {
+ ask.add(remoteRequest);//this will override the request if ask doesn't
+ //already have it.
+ } finally {
+ askLock.unlock();
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info("AFTER decResourceRequest:" + " applicationId="
+ + applicationId.getId() + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + getAskSize());
+ }
+ }
+
+ private int getAskSize() {
+ askLock.lock();
+ try {
+ return ask.size();
+ } finally {
+ askLock.unlock();
+ }
+ }
+
+ private String getStat() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ContainersAllocated: ").append(numContainersAllocated)
+ .append(", ContainersFinished: ").append(numFinishedContainers)
+ .append(", NumContainerReleaseRequests: ")
+ .append(numContainerReleaseRequests);
+ return sb.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void heartbeat() throws Exception {
+ LOG.info("BeforeHeartbeat: " + getStat());
+ int headRoom = getAvailableResources() != null ? getAvailableResources()
+ .getMemory() : 0;// first time it would be null
+ int lastClusterNmCount = clusterNmCount;
+ AllocateResponse response = errorCheckedMakeRemoteRequest();
+
+ int newHeadRoom = getAvailableResources() != null ? getAvailableResources()
+ .getMemory() : 0;
+ List<Container> newContainers = response.getAllocatedContainers();
+ logNewContainers(newContainers);
+ numContainersAllocated += newContainers.size();
+
+ List<ContainerStatus> finishedContainers = response
+ .getCompletedContainersStatuses();
+ logFinishedContainers(finishedContainers);
+ numFinishedContainers += finishedContainers.size();
+
+ List<NodeReport> updatedNodeReports = response.getUpdatedNodes();
+ logUpdatedNodes(updatedNodeReports);
+
+ LOG.info("AfterHeartbeat: " + getStat());
+
+ if (clusterNmCount != lastClusterNmCount) {
+ LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
+ + clusterNmCount);
+ eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
+ }
+
+ // Inform the Containers about completion..
+ for (ContainerStatus c : finishedContainers) {
+ eventHandler.handle(new AMContainerEventCompleted(c));
+ }
+
+ // Inform the scheduler about new containers.
+ List<ContainerId> newContainerIds;
+ if (newContainers.size() > 0) {
+ newContainerIds = new ArrayList<ContainerId>(newContainers.size());
+ for (Container container : newContainers) {
+ context.getAllContainers().addContainerIfNew(container);
+ newContainerIds.add(container.getId());
+ context.getAllNodes().nodeSeen(container.getNodeId());
+ eventHandler.handle(new AMNodeEventContainerAllocated(container
+ .getNodeId(), container.getId()));
+ }
+ eventHandler.handle(new AMSchedulerEventContainersAllocated(
+ newContainerIds, (newHeadRoom - headRoom != 0)));
+ }
+
+ //Inform the nodes about sate changes.
+ for (NodeReport nr : updatedNodeReports) {
+ eventHandler.handle(new AMNodeEventStateChanged(nr));
+ // Allocator will find out from the node, if at all.
+ // Relying on the RM to not allocated containers on an unhealthy node.
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ protected AllocateResponse errorCheckedMakeRemoteRequest() throws Exception {
+ AllocateResponse response = null;
+ try {
+ response = makeRemoteRequest();
+ // Reset retry count if no exception occurred.
+ retrystartTime = clock.getTime();
+ } catch (Exception e) {
+ // This can happen when the connection to the RM has gone down. Keep
+ // re-trying until the retryInterval has expired.
+ if (clock.getTime() - retrystartTime >= retryInterval) {
+ LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
+ eventHandler.handle(new DAGEvent(this.getJob().getID(),
+ DAGEventType.INTERNAL_ERROR));
+ throw new YarnException("Could not contact RM after " +
+ retryInterval + " milliseconds.");
+ }
+ // Throw this up to the caller, which may decide to ignore it and
+ // continue to attempt to contact the RM.
+ throw e;
+ }
+ if (response.getReboot()) {
+ // This can happen if the RM has been restarted. If it is in that state,
+ // this application must clean itself up.
+ eventHandler.handle(new DAGEvent(this.getJob().getID(),
+ DAGEventType.INTERNAL_ERROR));
+ throw new YarnException("Resource Manager doesn't recognize AttemptId: "
+ + this.getContext().getApplicationID());
+ }
+ return response;
+ }
+
+
+ protected AllocateResponse makeRemoteRequest() throws Exception {
+ List<ContainerId> clonedReleaseList = cloneAndClearReleaseList();
+ List<ResourceRequest> clonedAskList = cloneAndClearAskList();
+
+ AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+ applicationAttemptId, lastResponseID, super.getApplicationProgress(),
+ clonedAskList, clonedReleaseList);
+ AllocateResponse allocateResponse = null;
+ try {
+ allocateResponse = scheduler.allocate(allocateRequest);
+ } catch (Exception e) {
+ rePopulateListsOnError(clonedReleaseList, clonedAskList);
+ throw e;
+ }
+ lastResponseID = allocateResponse.getResponseId();
+ availableResources = allocateResponse.getAvailableResources();
+ clusterNmCount = allocateResponse.getNumClusterNodes();
+
+ if (clonedAskList.size() > 0 || clonedReleaseList.size() > 0) {
+ LOG.info("getResources() for " + applicationId + ":" + " ask="
+ + clonedAskList.size() + " release= " + clonedReleaseList.size()
+ + " newContainers="
+ + allocateResponse.getAllocatedContainers().size()
+ + " finishedContainers="
+ + allocateResponse.getCompletedContainersStatuses().size()
+ + " resourcelimit=" + availableResources + " knownNMs="
+ + clusterNmCount);
+ }
+
+ return allocateResponse;
+ }
+
+ @Override
+ public void handle(RMCommunicatorEvent rawEvent) {
+ switch(rawEvent.getType()) {
+ case CONTAINER_DEALLOCATE:
+ RMCommunicatorEventContainerDeAllocateRequest event = (RMCommunicatorEventContainerDeAllocateRequest) rawEvent;
+ releaseLock.lock();
+ try {
+ numContainerReleaseRequests++;
+ release.add(event.getContainerId());
+ } finally {
+ releaseLock.unlock();
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+
+ private List<ContainerId> cloneAndClearReleaseList() {
+ ArrayList<ContainerId> clonedReleaseList;
+ releaseLock.lock();
+ try {
+ if (release.size() == 0) {
+ return emptyReleaseList;
+ }
+ clonedReleaseList = new ArrayList<ContainerId>(release);
+ release.clear();
+ return clonedReleaseList;
+ } finally {
+ releaseLock.unlock();
+ }
+ }
+
+ private List<ResourceRequest> cloneAndClearAskList() {
+ ArrayList<ResourceRequest> clonedAskList;
+ askLock.lock();
+ try {
+ if (ask.size() == 0) {
+ return emptyAskList;
+ }
+ clonedAskList = new ArrayList<ResourceRequest>(ask);
+ ask.clear();
+ return clonedAskList;
+ } finally {
+ askLock.unlock();
+ }
+ }
+
+ private void rePopulateListsOnError(List<ContainerId> clonedReleaseList,
+ List<ResourceRequest> clonedAskList) {
+ releaseLock.lock();
+ try {
+ release.addAll(clonedReleaseList);
+ } finally {
+ releaseLock.unlock();
+ }
+ askLock.lock();
+ try {
+ // Asks for a particular ressource could have changed (increased or
+ // decresed) during the failure. Re-pull the list from the
+ // remoteRequestTable. ask being a hashSet and using the same objects
+ // avoids duplicates.
+ rePopulateAskList(clonedAskList);
+ } finally {
+ askLock.unlock();
+ }
+ }
+
+ private void rePopulateAskList(List<ResourceRequest> clonedAskList) {
+ for (ResourceRequest rr : clonedAskList) {
+ addResourceRequest(rr.getPriority(), rr.getHostName(),
+ rr.getCapability(), 0);
+ }
+ }
+
+ private void logNewContainers(List<Container> newContainers) {
+ if (newContainers.size() > 0) {
+ LOG.info("Got allocated " + newContainers.size() + " containers");
+ for (Container c : newContainers) {
+ LOG.info("AllocatedContainer: " + c);
+ }
+ }
+ }
+
+ private void logFinishedContainers(List<ContainerStatus> finishedContainers) {
+ if (finishedContainers.size() > 0) {
+ LOG.info(finishedContainers.size() + " containers finished");
+ for (ContainerStatus cs : finishedContainers) {
+ LOG.info("FinihsedContainer: " + cs);
+ }
+ }
+ }
+
+ private void logUpdatedNodes(List<NodeReport> nodeReports) {
+ if (nodeReports.size() > 0) {
+ LOG.info(nodeReports.size() + " nodes changed state");
+ for (NodeReport nr : nodeReports) {
+ LOG.info("UpdatedNodeReport: " + nr);
+ }
+ }
+ }
+
+ @Private
+ Map<Priority, Map<String, Map<Resource, ResourceRequest>>> getRemoteRequestTable() {
+ return remoteRequestsTable;
+ }
+
+ @Private
+ Set<ResourceRequest> getAskSet() {
+ return ask;
+ }
+
+ @Private
+ Set<ContainerId> getReleaseSet() {
+ return release;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMContainerRequestor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,460 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tez.dag.app.rm.AMRMClient.ContainerRequest;
+import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/* TODO not yet updating cluster nodes on every allocate response
+ * from RMContainerRequestor
+ import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
+ if (clusterNmCount != lastClusterNmCount) {
+ LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
+ + clusterNmCount);
+ eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
+ }
+ */
+public class TaskScheduler extends AbstractService
+ implements AMRMClientAsync.CallbackHandler {
+ private static final Log LOG = LogFactory.getLog(TaskScheduler.class);
+
+ public interface TaskSchedulerAppCallback {
+ public class AppFinalStatus {
+ public final FinalApplicationStatus exitStatus;
+ public final String exitMessage;
+ public final String postCompletionTrackingUrl;
+ public AppFinalStatus(FinalApplicationStatus exitStatus,
+ String exitMessage,
+ String posCompletionTrackingUrl) {
+ this.exitStatus = exitStatus;
+ this.exitMessage = exitMessage;
+ this.postCompletionTrackingUrl = posCompletionTrackingUrl;
+ }
+ }
+ // upcall to app must be outside locks
+ public void taskAllocated(Object task,
+ Object appCookie,
+ Container container);
+ // this may end up being called for a task+container pair that the app
+ // has not heard about. this can happen because of a race between
+ // taskAllocated() upcall and deallocateTask() downcall
+ public void containerCompleted(Object taskLastAllocated,
+ ContainerStatus containerStatus);
+ public void nodesUpdated(List<NodeReport> updatedNodes);
+ public void appRebootRequested();
+ public void setApplicationRegistrationData(
+ Resource minContainerCapability,
+ Resource maxContainerCapability,
+ Map<ApplicationAccessType, String> appAcls
+ );
+ public void onError(Exception e);
+ public float getProgress();
+ public AppFinalStatus getFinalAppStatus();
+ }
+
+ final AMRMClientAsync<CRCookie> amRmClient;
+ final TaskSchedulerAppCallback appClient;
+
+ Map<Object, ContainerRequest<CRCookie>> taskRequests =
+ new HashMap<Object, ContainerRequest<CRCookie>>();
+ Map<Object, Container> taskAllocations =
+ new HashMap<Object, Container>();
+ Map<ContainerId, Object> containerAssigments =
+ new HashMap<ContainerId, Object>();
+ HashMap<ContainerId, Object> releasedContainers =
+ new HashMap<ContainerId, Object>();
+
+ final String appHostName;
+ final int appHostPort;
+ final String appTrackingUrl;
+
+ class CRCookie {
+ Object task;
+ Object appCookie;
+ }
+
+ public TaskScheduler(ApplicationAttemptId id,
+ TaskSchedulerAppCallback appClient,
+ String appHostName,
+ int appHostPort,
+ String appTrackingUrl) {
+ super(TaskScheduler.class.getName());
+ this.appClient = appClient;
+ this.amRmClient = new AMRMClientAsync<CRCookie>(id, 1000, this);
+ this.appHostName = appHostName;
+ this.appHostPort = appHostPort;
+ this.appTrackingUrl = appTrackingUrl;
+ }
+
+ @Private
+ @VisibleForTesting
+ TaskScheduler(ApplicationAttemptId id,
+ TaskSchedulerAppCallback appClient,
+ String appHostName,
+ int appHostPort,
+ String appTrackingUrl,
+ AMRMClientAsync<CRCookie> client) {
+ super(TaskScheduler.class.getName());
+ this.appClient = appClient;
+ this.amRmClient = client;
+ this.appHostName = appHostName;
+ this.appHostPort = appHostPort;
+ this.appTrackingUrl = appTrackingUrl;
+ }
+
+ public Resource getClusterAvailableResources() {
+ return amRmClient.getClusterAvailableResources();
+ }
+
+ public int getClusterNodeCount() {
+ return amRmClient.getClusterNodeCount();
+ }
+
+ // AbstractService methods
+ @Override
+ public synchronized void init(Configuration conf) {
+ super.init(conf);
+ amRmClient.init(conf);
+ }
+
+ @Override
+ public void start() {
+ try {
+ RegisterApplicationMasterResponse response = null;
+ synchronized (this) {
+ amRmClient.start();
+ super.start();
+ response = amRmClient.registerApplicationMaster(appHostName,
+ appHostPort,
+ appTrackingUrl);
+ }
+ // upcall to app outside locks
+ appClient.setApplicationRegistrationData(
+ response.getMinimumResourceCapability(),
+ response.getMaximumResourceCapability(),
+ response.getApplicationACLs());
+ } catch (YarnRemoteException e) {
+ LOG.error("Exception while registering", e);
+ throw new YarnException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ // upcall to app outside of locks
+ AppFinalStatus status = appClient.getFinalAppStatus();
+ try {
+ // FIXME make this optional for the reboot case
+ synchronized (this) {
+ amRmClient.unregisterApplicationMaster(status.exitStatus,
+ status.exitMessage,
+ status.postCompletionTrackingUrl);
+ amRmClient.stop();
+ super.stop();
+ }
+ } catch (YarnRemoteException e) {
+ LOG.error("Exception while unregistering ", e);
+ throw new YarnException(e);
+ }
+ }
+
+ // AMRMClientAsync interface methods
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> statuses) {
+ Map<Object, ContainerStatus> appContainerStatus =
+ new HashMap<Object, ContainerStatus>(statuses.size());
+ synchronized (this) {
+ for(ContainerStatus containerStatus : statuses) {
+ ContainerId completedId = containerStatus.getContainerId();
+ Object task = releasedContainers.remove(completedId);
+ if(task != null){
+ // TODO later we may want to check if exit code matched expectation
+ // e.g. successful container should not come back fail exit code after
+ // being released
+ // completion of a container we had released earlier
+ // an allocated container completed. notify app
+ LOG.info("Released container completed:" + completedId +
+ " last allocated to task: " + task);
+ appContainerStatus.put(task, containerStatus);
+ continue;
+ }
+
+ // not found in released containers. check currently allocated containers
+ // no need to release this container as the RM has already completed it
+ task = unAssignContainer(completedId, false);
+ if(task != null) {
+ // completion of a container we have allocated currently
+ // an allocated container completed. notify app
+ LOG.info("Allocated container completed:" + completedId +
+ " last allocated to task: " + task);
+ appContainerStatus.put(task, containerStatus);
+ continue;
+ }
+
+ // container neither allocated nor released
+ LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
+ }
+ }
+
+ // upcall to app must be outside locks
+ for(Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
+ appClient.containerCompleted(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+ Map<ContainerRequest<CRCookie>, Container> appContainers =
+ new HashMap<ContainerRequest<CRCookie>, Container>(containers.size());
+ synchronized (this) {
+ for(Container container : containers) {
+ String location = container.getNodeId().getHost();
+ ContainerRequest<CRCookie> assigned = getMatchingRequest(container, location);
+ if(assigned == null) {
+ location = RackResolver.resolve(location).getNetworkLocation();
+ assigned = getMatchingRequest(container, location);
+ }
+ if(assigned == null) {
+ location = ResourceRequest.ANY;
+ assigned = getMatchingRequest(container, location);
+ }
+ if(assigned == null) {
+ // not matched anything. release container
+ // Probably we cancelled a request and RM allocated that to us
+ // before RM heard of the cancellation
+ releaseContainer(container.getId(), null);
+ LOG.info("No RM requests matching container: " + container);
+ continue;
+ }
+
+ Object task = getTask(assigned);
+ assert task != null;
+ assignContainer(task, container, assigned);
+ appContainers.put(assigned, container);
+
+ LOG.info("Assigning container: " + container +
+ " for task: " + task +
+ " at locality: " + location);
+
+ }
+ }
+
+ // upcall to app must be outside locks
+ for(Entry<ContainerRequest<CRCookie>, Container> entry : appContainers.entrySet()) {
+ ContainerRequest<CRCookie> assigned = entry.getKey();
+ appClient.taskAllocated(getTask(assigned), getAppCookie(assigned), entry.getValue());
+ }
+ }
+
+ @Override
+ public void onRebootRequest() {
+ // upcall to app must be outside locks
+ appClient.appRebootRequested();
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {
+ // ignore bad nodes for now
+ // upcall to app must be outside locks
+ appClient.nodesUpdated(updatedNodes);
+ }
+
+ @Override
+ public float getProgress() {
+ return appClient.getProgress();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ appClient.onError(e);
+ }
+
+ public synchronized void allocateTask(Object task,
+ Resource capability,
+ String[] hosts,
+ String[] racks,
+ Priority priority,
+ Object clientCookie) {
+ // TODO check for nulls etc
+ ContainerRequest<CRCookie> request =
+ new ContainerRequest<CRCookie>(capability,
+ hosts,
+ racks,
+ priority,
+ 1);
+ // TODO extra memory allocation
+ CRCookie cookie = new CRCookie();
+ cookie.task = task;
+ cookie.appCookie = clientCookie;
+ request.setCookie(cookie);
+
+ addTaskRequest(task, request);
+ LOG.info("Allocation request for task: " + task +
+ " with request: " + request);
+ }
+
+ public synchronized Container deallocateTask(Object task) {
+ ContainerRequest<CRCookie> request = removeTaskRequest(task);
+ if(request != null) {
+ // task not allocated yet
+ LOG.info("Deallocating task: " + task + " before allocation");
+ return null;
+ }
+
+ // task request not present. Look in allocations
+ Container container = unAssignContainer(task, true);
+ if(container != null) {
+ LOG.info("Deallocated task: " + task +
+ " from container: " + container.getId());
+ return container;
+ }
+
+ // task neither requested nor allocated.
+ LOG.info("Ignoring removal of unknown task: " + task);
+ return null;
+ }
+
+ public synchronized Object deallocateContainer(ContainerId containerId) {
+ Object task = unAssignContainer(containerId, true);
+ if(task != null) {
+ LOG.info("Deallocated container: " + containerId +
+ " from task: " + task);
+ return task;
+ }
+
+ LOG.info("Ignoring dealloction of unknown container: " + containerId);
+ return null;
+ }
+
+ private ContainerRequest<CRCookie> getMatchingRequest(Container container, String location) {
+ Priority priority = container.getPriority();
+ Resource capability = container.getResource();
+ ContainerRequest<CRCookie> assigned = null;
+ Collection<ContainerRequest<CRCookie>> requests =
+ amRmClient.getMatchingRequests(priority, location, capability);
+
+ if(requests != null) {
+ // TODO maybe do FIFO
+ Iterator<ContainerRequest<CRCookie>> iterator = requests.iterator();
+ if(iterator.hasNext()) {
+ assigned = requests.iterator().next();
+ }
+ }
+
+ return assigned;
+ }
+
+ private Object getTask(ContainerRequest<CRCookie> request) {
+ return ((CRCookie)request.getCookie()).task;
+ }
+
+ private Object getAppCookie(ContainerRequest<CRCookie> request) {
+ return ((CRCookie)request.getCookie()).appCookie;
+ }
+
+ private void releaseContainer(ContainerId containerId, Object task) {
+ amRmClient.releaseAssignedContainer(containerId);
+ if(task != null) {
+ releasedContainers.put(containerId, task);
+ }
+ }
+
+ private void assignContainer(Object task,
+ Container container,
+ ContainerRequest<CRCookie> assigned) {
+ ContainerRequest<CRCookie> request = removeTaskRequest(task);
+ assert request != null;
+ //assert assigned.equals(request);
+
+ Container result = taskAllocations.put(task, container);
+ assert result == null;
+ containerAssigments.put(container.getId(), task);
+
+ }
+
+ private ContainerRequest<CRCookie> removeTaskRequest(Object task) {
+ ContainerRequest<CRCookie> request = taskRequests.remove(task);
+ if(request != null) {
+ // remove all references of the request from AMRMClient
+ amRmClient.removeContainerRequest(request);
+ }
+ return request;
+ }
+
+ private void addTaskRequest(Object task, ContainerRequest<CRCookie> request) {
+ // FIXME duplicates
+ taskRequests.put(task, request);
+ amRmClient.addContainerRequest(request);
+ }
+
+ private Container unAssignContainer(Object task, boolean releaseIfFound) {
+ Container container = taskAllocations.remove(task);
+ if(container == null) {
+ return null;
+ }
+ containerAssigments.remove(container.getId());
+ if(releaseIfFound) {
+ releaseContainer(container.getId(), task);
+ }
+ return container;
+ }
+
+ private Object unAssignContainer(ContainerId containerId,
+ boolean releaseIfFound) {
+ Object task = containerAssigments.remove(containerId);
+ if(task == null) {
+ return null;
+ }
+ taskAllocations.remove(task);
+ if(releaseIfFound) {
+ releaseContainer(containerId, task);
+ }
+ return task;
+ }
+
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.client.ClientService;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
+import org.apache.tez.dag.app.rm.container.AMContainerState;
+import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
+import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
+import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
+import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
+
+public class TaskSchedulerEventHandler extends AbstractService
+ implements TaskSchedulerAppCallback,
+ EventHandler<AMSchedulerEvent> {
+ static final Log LOG = LogFactory.getLog(TaskSchedulerEventHandler.class);
+
+ protected final AppContext appContext;
+ @SuppressWarnings("rawtypes")
+ private final EventHandler eventHandler;
+ private final TaskScheduler taskScheduler;
+ private DAG job;
+ private Map<ApplicationAccessType, String> appAcls = null;
+ private Thread eventHandlingThread;
+ private volatile boolean stopEventHandling;
+ // Has a signal (SIGTERM etc) been issued?
+ protected volatile boolean isSignalled = false;
+
+ BlockingQueue<AMSchedulerEvent> eventQueue
+ = new LinkedBlockingQueue<AMSchedulerEvent>();
+
+ public TaskSchedulerEventHandler(AppContext appContext,
+ ClientService clientService) {
+ super(TaskSchedulerEventHandler.class.getName());
+ this.appContext = appContext;
+ eventHandler = appContext.getEventHandler();
+ InetSocketAddress serviceAddr = clientService.getBindAddress();
+ taskScheduler =
+ new TaskScheduler(appContext.getApplicationAttemptId(),
+ this,
+ serviceAddr.getHostName(),
+ serviceAddr.getPort(),
+ serviceAddr.getHostName() +
+ ":" + clientService.getHttpPort());
+ }
+
+ public Map<ApplicationAccessType, String> getApplicationAcls() {
+ return appAcls;
+ }
+
+ public void setSignalled(boolean isSignalled) {
+ this.isSignalled = isSignalled;
+ LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+ }
+
+ public Resource getAvailableResources() {
+ return taskScheduler.getClusterAvailableResources();
+ }
+
+ public synchronized void handleEvent(AMSchedulerEvent sEvent) {
+ LOG.info("Processing the event " + sEvent.toString());
+ switch (sEvent.getType()) {
+ case S_TA_LAUNCH_REQUEST:
+ handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
+ break;
+ case S_TA_ENDED: // TaskAttempt considered complete.
+ AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
+ switch(event.getState()) {
+ case FAILED:
+ case KILLED:
+ handleTAUnsuccessfulEnd((AMSchedulerEventTAEnded) sEvent);
+ break;
+ case SUCCEEDED:
+ handleTASucceeded(event);
+ break;
+ default:
+ throw new YarnException("Unexecpted TA_ENDED state: " + event.getState());
+ }
+ break;
+ case S_CONTAINER_DEALLOCATE:
+ handleContainerDeallocate((AMSchedulerEventDeallocateContainer)sEvent);
+ break;
+ case S_CONTAINERS_ALLOCATED:
+ break;
+ case S_CONTAINER_COMPLETED:
+ case S_NODE_BLACKLISTED:
+ break;
+ case S_NODE_UNHEALTHY:
+ break;
+ case S_NODE_HEALTHY:
+ // Consider changing this to work like BLACKLISTING.
+ break;
+ }
+ }
+
+ @Override
+ public void handle(AMSchedulerEvent event) {
+ int qSize = eventQueue.size();
+ if (qSize != 0 && qSize % 1000 == 0) {
+ LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+ }
+ int remCapacity = eventQueue.remainingCapacity();
+ if (remCapacity < 1000) {
+ LOG.warn("Very low remaining capacity in the event-queue "
+ + "of RMContainerAllocator: " + remCapacity);
+ }
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ eventHandler.handle(event);
+ }
+
+
+ private void handleContainerDeallocate(
+ AMSchedulerEventDeallocateContainer event) {
+ ContainerId containerId = event.getContainerId();
+ // TODO what happens to the task that was connected to this container?
+ // current assumption is that it will eventually call handleTaStopRequest
+ //TaskAttempt taskAttempt = (TaskAttempt)
+ taskScheduler.deallocateContainer(containerId);
+ // TODO does this container need to be stopped via C_STOP_REQUEST
+ sendEvent(new AMContainerEvent(containerId,
+ AMContainerEventType.C_STOP_REQUEST));
+ }
+
+ private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
+ /*MRxTaskAttemptID aId = event.getAttemptID();
+ attemptToLaunchRequestMap.remove(aId);
+ // TODO XXX: This remove may need to be deferred. Possible for a SUCCESSFUL taskAttempt to fail,
+ // which means the scheduler needs to remember taskAttempt to container assignments for a longer time.
+ boolean removed = pendingReduces.remove(aId);
+ if (!removed) {
+ removed = scheduledRequests.remove(aId);
+ if (!removed) {
+ // Maybe assigned.
+ ContainerId containerId = assignedRequests.remove(aId);
+ if (containerId != null) {
+ // Ask the container to stop.
+ sendEvent(new AMContainerEvent(containerId,
+ AMContainerEventType.C_STOP_REQUEST));
+ // Inform the Node - the task has asked to be STOPPED / has already
+ // stopped.
+ sendEvent(new AMNodeEventTaskAttemptEnded(containerMap
+ .get(containerId).getContainer().getNodeId(), containerId,
+ event.getAttemptID(), event.getState() == TaskAttemptState.FAILED));
+ } else {
+ LOG.warn("Received a STOP request for absent taskAttempt: "
+ + event.getAttemptID());
+ // This could be generated in case of recovery, with unhealthy nodes/
+ // fetch failures. Can be ignored, since Recovered containers don't
+ // need to be stopped.
+ }
+ }
+ }*/
+
+ TaskAttempt attempt = event.getAttempt();
+ Container container = taskScheduler.deallocateTask(attempt);
+ // use stored value of container id in case the scheduler has removed this
+ // assignment because the task has been deallocated earlier.
+ // retroactive case
+ ContainerId attemptContainerId = attempt.getAssignedContainerID();
+
+ if(container != null) {
+ // use scheduler container since it exists
+ ContainerId containerId = container.getId();
+ assert attemptContainerId==null || attemptContainerId.equals(containerId);
+ attemptContainerId = containerId;
+ } else {
+ LOG.info("Task: " + attempt.getID() +
+ " has no container assignment in the scheduler");
+ }
+
+ if (attemptContainerId != null) {
+ // TODO either ways send the necessary events
+ // Ask the container to stop.
+ sendEvent(new AMContainerEvent(attemptContainerId,
+ AMContainerEventType.C_STOP_REQUEST));
+ // Inform the Node - the task has asked to be STOPPED / has already
+ // stopped.
+ sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
+ get(attemptContainerId).getContainer().getNodeId(), attemptContainerId,
+ attempt.getID(), event.getState() == TaskAttemptState.FAILED));
+ }
+ }
+
+ private void handleTASucceeded(AMSchedulerEventTAEnded event) {
+ /*
+ // TODO XXX Remember the assigned containerId even after task success.
+ // Required for TOO_MANY_FETCH_FAILURES
+ attemptToLaunchRequestMap.remove(event.getAttemptID());
+ ContainerId containerId = assignedRequests.remove(event.getAttemptID());
+ if (containerId != null) { // TODO Should not be null. Confirm.
+ sendEvent(new AMContainerTASucceededEvent(containerId,
+ event.getAttemptID()));
+ sendEvent(new AMNodeEventTaskAttemptSucceeded(containerMap
+ .get(containerId).getContainer().getNodeId(), containerId,
+ event.getAttemptID()));
+ containerAvailable(containerId);
+ } else {
+ LOG.warn("Received TaskAttemptSucceededEvent for unmapped TaskAttempt: "
+ + event.getAttemptID() + ". Full event: " + event);
+ }*/
+
+ TaskAttempt attempt = event.getAttempt();
+ Container container = taskScheduler.deallocateTask(attempt);
+ if(container != null) {
+ ContainerId containerId = container.getId();
+ assert containerId.equals(event.getUsedContainerId());
+ sendEvent(new AMContainerEventTASucceeded(containerId,
+ event.getAttemptID()));
+ // Inform the Node - the task has asked to be STOPPED / has already
+ // stopped.
+ sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers().
+ get(containerId).getContainer().getNodeId(), containerId,
+ event.getAttemptID()));
+ // TODO this is where reuse will happen
+ sendEvent(new AMContainerEvent(containerId,
+ AMContainerEventType.C_STOP_REQUEST));
+ }
+ }
+
+ /*
+ @SuppressWarnings("unchecked")
+ private int maybeComputeNormalizedRequestForType(
+ AMSchedulerTALaunchRequestEvent event, int prevComputedSize) {
+ if (prevComputedSize == 0) {
+ int supportedMaxContainerCapability = appContext.getClusterInfo()
+ .getMaxContainerCapability().getMemory();
+ prevComputedSize = event.getCapability().getMemory();
+ int minSlotMemSize = appContext.getClusterInfo()
+ .getMinContainerCapability().getMemory();
+ prevComputedSize = (int) Math.ceil((float) prevComputedSize
+ / minSlotMemSize)
+ * minSlotMemSize;
+ if (prevComputedSize > supportedMaxContainerCapability) {
+ String diagMsg = " capability required is more than the supported "
+ + "max container capability in the cluster. Killing the Job. "
+ + "ResourceReqt: " + prevComputedSize
+ + " maxContainerCapability:" + supportedMaxContainerCapability;
+ LOG.info(diagMsg);
+ eventHandler.handle(new DAGDiagnosticsUpdateEvent(job.getID(), diagMsg));
+ eventHandler.handle(new DAGEvent(job.getID(), DAGEventType.JOB_KILL));
+ }
+ }
+ return prevComputedSize;
+ }
+ */
+
+ private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
+ /**
+ // Add to queue of pending tasks.
+ recalculateReduceSchedule = true;
+ attemptToLaunchRequestMap.put(event.getAttemptID(), event);
+ if (event.getAttemptID().getTaskID().getTaskType() == TaskType.MAP) {
+ mapResourceReqt = maybeComputeNormalizedRequestForType(event,
+ TaskType.MAP, mapResourceReqt);
+ event.getCapability().setMemory(mapResourceReqt);
+ scheduledRequests.addMap(event);
+ } else { // Reduce
+ reduceResourceReqt = maybeComputeNormalizedRequestForType(event,
+ TaskType.REDUCE, reduceResourceReqt);
+ event.getCapability().setMemory(reduceResourceReqt);
+ if (event.isRescheduled()) {
+ pendingReduces.addFirst(new ContainerRequestInfo(new ContainerRequest(
+ event.getCapability(), event.getHosts(), event.getRacks(),
+ PRIORITY_REDUCE), event));
+ } else {
+ pendingReduces.addLast(new ContainerRequestInfo(new ContainerRequest(
+ event.getCapability(), event.getHosts(), event.getRacks(),
+ PRIORITY_REDUCE), event));
+ }
+ }
+ */
+ // TODO resource adjustment needs to move into dag
+ /*Resource mapResourceReqt = maybeComputeNormalizedRequestForType(event,
+ TaskType.MAP, mapResourceReqt);
+ event.getCapability().setMemory(mapResourceReqt);*/
+ TaskAttempt taskAttempt = event.getTaskAttempt();
+ taskScheduler.allocateTask(taskAttempt,
+ event.getCapability(),
+ event.getHosts(),
+ event.getRacks(),
+ event.getPriority(),
+ event);
+ }
+
+ // AbstractService methods
+ @Override
+ public synchronized void init(Configuration conf) {
+ super.init(conf);
+ taskScheduler.init(conf);
+ // todo set heartbeat value from conf here
+ }
+
+ @Override
+ public synchronized void start() {
+ // FIXME hack alert how is this supposed to support multiple DAGs?
+ // Answer: this is shared across dags. need job==app-dag-master
+ job = appContext.getDAG();
+ taskScheduler.start();
+ this.eventHandlingThread = new Thread() {
+ @Override
+ public void run() {
+
+ AMSchedulerEvent event;
+
+ while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+ try {
+ event = TaskSchedulerEventHandler.this.eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.error("Returning, interrupted : " + e);
+ continue;
+ }
+
+ try {
+ handleEvent(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " to the TaskScheduler", t);
+ // Kill the AM.
+ sendEvent(new DAGEvent(job.getID(), DAGEventType.INTERNAL_ERROR));
+ return;
+ }
+ }
+ }
+ };
+ this.eventHandlingThread.start();
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ this.stopEventHandling = true;
+ if (eventHandlingThread != null)
+ eventHandlingThread.interrupt();
+ taskScheduler.stop();
+ super.stop();
+ }
+
+ // TaskSchedulerAppCallback methods
+ @Override
+ public synchronized void taskAllocated(Object task,
+ Object appCookie,
+ Container container) {
+ /*
+ availableContainerIds.addAll(event.getContainerIds());
+
+ completedMaps = getJob().getCompletedMaps();
+ completedReduces = getJob().getCompletedReduces();
+ int completedTasks = completedMaps + completedReduces;
+
+ if (lastCompletedTasks != completedTasks) {
+ recalculateReduceSchedule = true;
+ lastCompletedTasks = completedTasks;
+ }
+
+ if (event.didHeadroomChange() || event.getContainerIds().size() > 0) {
+ recalculateReduceSchedule = true;
+ }
+ schedule();
+ .....
+ // Update resource requests
+ requestor.decContainerReq(assigned.getContainerRequest());
+
+ // TODO Maybe: ApplicationACLs should be populated into the appContext from the RMCommunicator.
+ ContainerId containerId = allocated.getId();
+ if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
+ AMSchedulerTALaunchRequestEvent tlrEvent = attemptToLaunchRequestMap
+ .get(assigned.getAttemptId());
+ JobConf jobConf = new JobConf(getJob().getConf());
+
+ AMContainerEventLaunchRequest launchRequest = new AMContainerEventLaunchRequest(
+ containerId, jobId, assigned.getAttemptId().getTaskId()
+ .getTaskType(), tlrEvent.getJobToken(),
+ tlrEvent.getCredentials(), shouldProfileTaskAttempt(
+ jobConf, tlrEvent.getRemoteTaskContext()), jobConf);
+
+ eventHandler.handle(launchRequest);
+ }
+ eventHandler.handle(new AMContainerEventAssignTA(containerId,
+ assigned.getAttemptId(), attemptToLaunchRequestMap.get(
+ assigned.getAttemptId()).getRemoteTaskContext()));
+
+ assignedRequests.add(allocated, assigned.getAttemptId());
+ */
+
+ ContainerId containerId = container.getId();
+ appContext.getAllContainers().addContainerIfNew(container);
+ appContext.getAllNodes().nodeSeen(container.getNodeId());
+ sendEvent(new AMNodeEventContainerAllocated(container
+ .getNodeId(), container.getId()));
+
+ AMSchedulerEventTALaunchRequest event =
+ (AMSchedulerEventTALaunchRequest) appCookie;
+ TaskAttempt taskAttempt = event.getTaskAttempt();
+ // TODO - perhaps check if the task still needs this container
+ // because the deallocateTask downcall may have raced with the
+ // taskAllocated() upcall
+ assert task.equals(taskAttempt);
+ if (appContext.getAllContainers().get(containerId).getState()
+ == AMContainerState.ALLOCATED) {
+
+ sendEvent(new AMContainerEventLaunchRequest(
+ containerId,
+ taskAttempt.getID().getTaskID().getVertexID(),
+ event.getJobToken(),
+ event.getCredentials(), false, job.getConf(),
+ taskAttempt.getLocalResources(),
+ taskAttempt.getEnvironment()));
+ }
+ sendEvent(new AMContainerEventAssignTA(containerId,
+ taskAttempt.getID(), event.getRemoteTaskContext()));
+ }
+
+ @Override
+ public synchronized void containerCompleted(Object task, ContainerStatus containerStatus) {
+ // Inform the Containers about completion.
+ sendEvent(new AMContainerEventCompleted(containerStatus));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized void nodesUpdated(List<NodeReport> updatedNodes) {
+ for (NodeReport nr : updatedNodes) {
+ // Scheduler will find out from the node, if at all.
+ // Relying on the RM to not allocate containers on an unhealthy node.
+ eventHandler.handle(new AMNodeEventStateChanged(nr));
+ }
+ }
+
+ @Override
+ public synchronized void appRebootRequested() {
+ // This can happen if the RM has been restarted. If it is in that state,
+ // this application must clean itself up.
+ // TODO change event to REBOOT
+ // FIXME appReboot != dagReboot
+ // TODO handle multiple dags - Answer: this is shared across dags
+ sendEvent(new DAGEvent(appContext.getDAGID(),
+ DAGEventType.INTERNAL_ERROR));
+ throw new YarnException("ResourceManager requests reboot for: "
+ + appContext.getApplicationID());
+ }
+
+ @Override
+ public synchronized void setApplicationRegistrationData(Resource minContainerCapability,
+ Resource maxContainerCapability,
+ Map<ApplicationAccessType, String> appAcls) {
+ this.appContext.getClusterInfo().setMinContainerCapability(
+ minContainerCapability);
+ this.appContext.getClusterInfo().setMaxContainerCapability(
+ maxContainerCapability);
+ this.appAcls = appAcls;
+ }
+
+ @Override
+ public synchronized AppFinalStatus getFinalAppStatus() {
+ FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+ StringBuffer sb = new StringBuffer();
+ if (job == null) {
+ finishState = FinalApplicationStatus.FAILED;
+ sb.append("Job failed to initialize");
+ } else {
+ if (job.getState() == DAGState.SUCCEEDED) {
+ finishState = FinalApplicationStatus.SUCCEEDED;
+ } else if (job.getState() == DAGState.KILLED
+ || (job.getState() == DAGState.RUNNING && isSignalled)) {
+ finishState = FinalApplicationStatus.KILLED;
+ } else if (job.getState() == DAGState.FAILED
+ || job.getState() == DAGState.ERROR) {
+ finishState = FinalApplicationStatus.FAILED;
+ }
+ for (String s : job.getDiagnostics()) {
+ sb.append(s).append("\n");
+ }
+ }
+ LOG.info("Setting job diagnostics to " + sb.toString());
+
+ String historyUrl = "";
+ /*String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
+ appContext.getApplicationID());
+ LOG.info("History url is " + historyUrl);*/
+
+ return new AppFinalStatus(finishState, sb.toString(), historyUrl);
+ }
+
+ @Override
+ public synchronized float getProgress() {
+ return job.getProgress();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ // TODO Possibly wait for some time and then stop
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public interface AMContainer extends EventHandler<AMContainerEvent>{
+
+ public AMContainerState getState();
+ public ContainerId getContainerId();
+ public Container getContainer();
+ //TODO Rename - CompletedTaskAttempts, ideally means FAILED / KILLED as well.
+ public List<TezTaskAttemptID> getCompletedTaskAttempts();
+ public TezTaskAttemptID getRunningTaskAttempt();
+ public List<TezTaskAttemptID> getQueuedTaskAttempts();
+
+ public int getShufflePort();
+
+ // TODO Add a method to get the containers capabilities - to match taskAttempts.
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class AMContainerEvent extends AbstractEvent<AMContainerEventType> {
+
+ private final ContainerId containerId;
+
+ public AMContainerEvent(ContainerId containerId, AMContainerEventType type) {
+ super(type);
+ this.containerId = containerId;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class AMContainerEventAssignTA extends AMContainerEvent {
+
+ private final TezTaskAttemptID attemptId;
+ // TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ?
+ private final TezTask remoteTaskContext;
+
+ public AMContainerEventAssignTA(ContainerId containerId,
+ TezTaskAttemptID attemptId, Object remoteTaskContext) {
+ super(containerId, AMContainerEventType.C_ASSIGN_TA);
+ this.attemptId = attemptId;
+ this.remoteTaskContext = (TezTask)remoteTaskContext;
+ }
+
+ public TezTask getRemoteTaskContext() {
+ return this.remoteTaskContext;
+ }
+
+ public TezTaskAttemptID getTaskAttemptId() {
+ return this.attemptId;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+public class AMContainerEventCompleted extends AMContainerEvent {
+
+ private final ContainerStatus containerStatus;
+
+ public AMContainerEventCompleted(ContainerStatus containerStatus) {
+ super(containerStatus.getContainerId(), AMContainerEventType.C_COMPLETED);
+ this.containerStatus = containerStatus;
+ }
+
+ public ContainerStatus getContainerStatus() {
+ return this.containerStatus;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchFailed.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchFailed.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchFailed.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventLaunchFailed extends AMContainerEvent {
+
+ private final String message;
+
+ public AMContainerEventLaunchFailed(ContainerId containerId,
+ String message) {
+ super(containerId, AMContainerEventType.C_LAUNCH_FAILED);
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return this.message;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchFailed.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezVertexID;
+
+public class AMContainerEventLaunchRequest extends AMContainerEvent {
+
+ private final TezVertexID vertexId;
+ private final Token<JobTokenIdentifier> jobToken;
+ private final Credentials credentials;
+ private final boolean shouldProfile;
+ private final Configuration conf;
+ private final Map<String, LocalResource> localResources;
+ private final Map<String, String> environment;
+
+ public AMContainerEventLaunchRequest(ContainerId containerId,
+ TezVertexID vertexId,
+ Token<JobTokenIdentifier> jobToken,
+ Credentials credentials, boolean shouldProfile, Configuration conf,
+ Map<String, LocalResource> localResources,
+ Map<String, String> environment) {
+ super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
+ this.vertexId = vertexId;
+ this.jobToken = jobToken;
+ this.credentials = credentials;
+ this.shouldProfile = shouldProfile;
+ this.conf = conf;
+ this.localResources = localResources;
+ this.environment = environment;
+ }
+
+ public TezDAGID getDAGId() {
+ return this.vertexId.getDAGId();
+ }
+
+ public TezVertexID getVertexId() {
+ return this.vertexId;
+ }
+
+ public Token<JobTokenIdentifier> getJobToken() {
+ return this.jobToken;
+ }
+
+ public Credentials getCredentials() {
+ return this.credentials;
+ }
+
+ public boolean shouldProfile() {
+ return this.shouldProfile;
+ }
+
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ public Map<String, LocalResource> getLocalResources() {
+ return localResources;
+ }
+
+ public Map<String, String> getEnvironment() {
+ return environment;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunched.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunched.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunched.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunched.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventLaunched extends AMContainerEvent {
+
+ private final int shufflePort;
+
+ public AMContainerEventLaunched(ContainerId containerId, int shufflePort) {
+ super(containerId, AMContainerEventType.C_LAUNCHED);
+ this.shufflePort = shufflePort;
+ }
+
+ public int getShufflePort() {
+ return this.shufflePort;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunched.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventNodeFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventNodeFailed.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventNodeFailed.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventNodeFailed.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
+
+public class AMContainerEventNodeFailed extends AMContainerEvent implements
+ DiagnosableEvent {
+
+ private final String message;
+
+ public AMContainerEventNodeFailed(ContainerId containerId, String message) {
+ super(containerId, AMContainerEventType.C_NODE_FAILED);
+ this.message = message;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return message;
+ }
+
+}
\ No newline at end of file
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventNodeFailed.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventStopFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventStopFailed.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventStopFailed.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventStopFailed.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventStopFailed extends AMContainerEvent {
+
+ // TODO XXX Not being used for anything. May be useful if we rely less on
+ // the RM informing the job about container failure.
+
+ private final String message;
+
+ public AMContainerEventStopFailed(ContainerId containerId, String message) {
+ super(containerId, AMContainerEventType.C_NM_STOP_FAILED);
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return this.message;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventStopFailed.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventTASucceeded.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventTASucceeded.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventTASucceeded.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventTASucceeded.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class AMContainerEventTASucceeded extends AMContainerEvent {
+
+ private final TezTaskAttemptID attemptId;
+
+ public AMContainerEventTASucceeded(ContainerId containerId,
+ TezTaskAttemptID attemptId) {
+ super(containerId, AMContainerEventType.C_TA_SUCCEEDED);
+ this.attemptId = attemptId;
+ }
+
+ public TezTaskAttemptID getTaskAttemptId() {
+ return this.attemptId;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventTASucceeded.java
------------------------------------------------------------------------------
svn:eol-style = native