You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC

svn commit: r1469642 [17/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMContainerRequestor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMContainerRequestor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMContainerRequestor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,556 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.client.ClientService;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
+import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+
+/**
+ * Keeps the data structures to send container requests to RM.
+ */
+// TODO XXX: Eventually rename to RMCommunicator
+public class RMContainerRequestor extends RMCommunicator implements ContainerRequestor {
+  
+  private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
+  static final String ANY = "*";
+  
+  private final Clock clock;
+
+  private Resource availableResources; // aka headroom.
+  private long retrystartTime;
+  private long retryInterval;
+  
+  private int numContainerReleaseRequests;
+  private int numContainersAllocated;
+  private int numFinishedContainers; // Not very useful.
+  
+
+  //Key -> Priority
+  //Value -> Map
+  //  Key->ResourceName (e.g., hostname, rackname, *)
+  //  Value->Map
+  //    Key->Resource Capability
+  //    Value->ResourceRequest
+  private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+  remoteRequestsTable =
+      new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+  private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
+  private final Set<ContainerId> release = new TreeSet<ContainerId>();
+  
+  private Lock releaseLock = new ReentrantLock();
+  private Lock askLock = new ReentrantLock();
+  private final List<ContainerId> emptyReleaseList = new ArrayList<ContainerId>(0);
+  private final List<ResourceRequest> emptyAskList = new ArrayList<ResourceRequest>();
+  
+  private int clusterNmCount = 0;
+  
+  // TODO Consider allowing sync comm between the requestor and allocator... 
+  
+  // TODO Why does the RMRequestor require the ClientService ??
+  // (for the RPC address. get rid of this.)
+  public RMContainerRequestor(ClientService clientService, AppContext context) {
+    super(clientService, context);
+    this.clock = context.getClock();
+  }
+  
+  public static class ContainerRequest {
+    final Resource capability;
+    final String[] hosts;
+    final String[] racks;
+    final Priority priority;
+
+    public ContainerRequest(Resource capability, String[] hosts,
+        String[] racks, Priority priority) {
+      this.capability = capability;
+      this.hosts = hosts;
+      this.racks = racks;
+      this.priority = priority;
+    }
+
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Capability[").append(capability).append("]");
+      sb.append("Priority[").append(priority).append("]");
+      return sb.toString();
+    }
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    retrystartTime = clock.getTime();
+    retryInterval = getConfig().getLong(
+        MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+        MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+  }
+  
+  @Override
+  public Map<ApplicationAccessType, String> getApplicationACLs() {
+    return this.applicationACLs;
+  }
+
+  public void stop(Configuration conf) {
+    LOG.info("NumAllocatedContainers: " + numContainersAllocated
+           + "NumFinihsedContainers: " + numFinishedContainers
+           + "NumReleaseRequests: " + numContainerReleaseRequests);
+    super.stop();
+  }
+
+  @Override
+  public Resource getAvailableResources() {
+    return availableResources;
+  }
+
+  public void addContainerReq(ContainerRequest req) {
+    // Create resource requests
+    for (String host : req.hosts) {
+      // Data-local
+      // Assumes the scheduler is handling bad nodes. Tracking them here would
+      // lead to an out-of-sync scheduler / requestor.
+      addResourceRequest(req.priority, host, req.capability);
+    }
+
+    // Nothing Rack-local for now
+    for (String rack : req.racks) {
+      addResourceRequest(req.priority, rack, req.capability);
+    }
+
+    // Off-switch
+    addResourceRequest(req.priority, ANY, req.capability); 
+  }
+
+  public void decContainerReq(ContainerRequest req) {
+    // Update resource requests
+    for (String hostName : req.hosts) {
+      decResourceRequest(req.priority, hostName, req.capability);
+    }
+    
+    for (String rack : req.racks) {
+      decResourceRequest(req.priority, rack, req.capability);
+    }
+   
+    decResourceRequest(req.priority, ANY, req.capability);
+  }
+
+  private void addResourceRequest(Priority priority, String resourceName,
+      Resource capability) {
+    addResourceRequest(priority, resourceName, capability, 1);
+  }
+  
+  private void addResourceRequest(Priority priority, String resourceName,
+      Resource capability, int increment) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+      this.remoteRequestsTable.put(priority, remoteRequests);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added priority=" + priority);
+      }
+    }
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      reqMap = new HashMap<Resource, ResourceRequest>();
+      remoteRequests.put(resourceName, reqMap);
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+    if (remoteRequest == null) {
+      remoteRequest = Records.newRecord(ResourceRequest.class);
+      remoteRequest.setPriority(priority);
+      remoteRequest.setHostName(resourceName);
+      remoteRequest.setCapability(capability);
+      remoteRequest.setNumContainers(0);
+      reqMap.put(capability, remoteRequest);
+    }
+    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + increment);
+    // 0 is a special case to re-add the request to the ask table.
+
+    // Note this down for next interaction with ResourceManager
+    int askSize = 0;
+    askLock.lock();
+    try {
+      ask.add(remoteRequest);
+      askSize = ask.size();
+    } finally {
+      askLock.unlock();
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addResourceRequest:" + " applicationId="
+          + applicationId.getId() + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + askSize);
+    }
+  }
+
+  private void decResourceRequest(Priority priority, String resourceName,
+      Resource capability) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      // as we modify the resource requests by filtering out blacklisted hosts 
+      // when they are added, this value may be null when being 
+      // decremented
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as " + resourceName
+            + " is not present in request table");
+      }
+      return;
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+          + applicationId.getId() + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + getAskSize());
+    }
+
+    remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
+    if (remoteRequest.getNumContainers() == 0) {
+      reqMap.remove(capability);
+      if (reqMap.size() == 0) {
+        remoteRequests.remove(resourceName);
+      }
+      if (remoteRequests.size() == 0) {
+        remoteRequestsTable.remove(priority);
+      }
+      //remove from ask if it may have
+      askLock.lock();
+      try {
+        ask.remove(remoteRequest);
+      } finally {
+        askLock.unlock();
+      }
+    } else {
+      askLock.lock();
+      try {
+        ask.add(remoteRequest);//this will override the request if ask doesn't
+      //already have it.
+      } finally {
+        askLock.unlock();
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info("AFTER decResourceRequest:" + " applicationId="
+          + applicationId.getId() + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + getAskSize());
+    }
+  }
+  
+  private int getAskSize() {
+    askLock.lock();
+    try {
+      return ask.size();
+    } finally {
+      askLock.unlock();
+    }
+  }
+
+  private String getStat() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("ContainersAllocated: ").append(numContainersAllocated)
+        .append(", ContainersFinished: ").append(numFinishedContainers)
+        .append(", NumContainerReleaseRequests: ")
+        .append(numContainerReleaseRequests);
+    return sb.toString();
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void heartbeat() throws Exception {
+    LOG.info("BeforeHeartbeat: " + getStat());
+    int headRoom = getAvailableResources() != null ? getAvailableResources()
+        .getMemory() : 0;// first time it would be null
+    int lastClusterNmCount = clusterNmCount;
+    AllocateResponse response = errorCheckedMakeRemoteRequest();
+    
+    int newHeadRoom = getAvailableResources() != null ? getAvailableResources()
+        .getMemory() : 0;
+    List<Container> newContainers = response.getAllocatedContainers();    
+    logNewContainers(newContainers);
+    numContainersAllocated += newContainers.size();
+    
+    List<ContainerStatus> finishedContainers = response
+        .getCompletedContainersStatuses();
+    logFinishedContainers(finishedContainers);
+    numFinishedContainers += finishedContainers.size();
+    
+    List<NodeReport> updatedNodeReports = response.getUpdatedNodes();
+    logUpdatedNodes(updatedNodeReports);
+ 
+    LOG.info("AfterHeartbeat: " + getStat());
+    
+    if (clusterNmCount != lastClusterNmCount) {
+      LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
+          + clusterNmCount);
+      eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
+    }
+    
+    // Inform the Containers about completion..
+    for (ContainerStatus c : finishedContainers) {
+      eventHandler.handle(new AMContainerEventCompleted(c));
+    }
+   
+    // Inform the scheduler about new containers.
+    List<ContainerId> newContainerIds;
+    if (newContainers.size() > 0) {
+      newContainerIds = new ArrayList<ContainerId>(newContainers.size());
+      for (Container container : newContainers) {
+        context.getAllContainers().addContainerIfNew(container);
+        newContainerIds.add(container.getId()); 
+        context.getAllNodes().nodeSeen(container.getNodeId());
+        eventHandler.handle(new AMNodeEventContainerAllocated(container
+            .getNodeId(), container.getId()));
+      }
+      eventHandler.handle(new AMSchedulerEventContainersAllocated(
+          newContainerIds, (newHeadRoom - headRoom != 0)));
+    }
+
+    //Inform the nodes about sate changes.
+    for (NodeReport nr : updatedNodeReports) {
+      eventHandler.handle(new AMNodeEventStateChanged(nr));
+      // Allocator will find out from the node, if at all.
+      // Relying on the RM to not allocated containers on an unhealthy node.
+    }
+  }
+
+  
+  @SuppressWarnings("unchecked")
+  protected AllocateResponse errorCheckedMakeRemoteRequest() throws Exception {
+    AllocateResponse response = null;
+    try {
+      response = makeRemoteRequest();
+      // Reset retry count if no exception occurred.
+      retrystartTime = clock.getTime();
+    } catch (Exception e) {
+      // This can happen when the connection to the RM has gone down. Keep
+      // re-trying until the retryInterval has expired.
+      if (clock.getTime() - retrystartTime >= retryInterval) {
+        LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
+        eventHandler.handle(new DAGEvent(this.getJob().getID(),
+                                         DAGEventType.INTERNAL_ERROR));
+        throw new YarnException("Could not contact RM after " +
+                                retryInterval + " milliseconds.");
+      }
+      // Throw this up to the caller, which may decide to ignore it and
+      // continue to attempt to contact the RM.
+      throw e;
+    }
+    if (response.getReboot()) {
+      // This can happen if the RM has been restarted. If it is in that state,
+      // this application must clean itself up.
+      eventHandler.handle(new DAGEvent(this.getJob().getID(),
+          DAGEventType.INTERNAL_ERROR));
+      throw new YarnException("Resource Manager doesn't recognize AttemptId: "
+          + this.getContext().getApplicationID());
+    }
+    return response;
+  }
+  
+  
+  protected AllocateResponse makeRemoteRequest() throws Exception {
+    List<ContainerId> clonedReleaseList = cloneAndClearReleaseList();
+    List<ResourceRequest> clonedAskList = cloneAndClearAskList();
+
+    AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+        applicationAttemptId, lastResponseID, super.getApplicationProgress(),
+        clonedAskList, clonedReleaseList);
+    AllocateResponse allocateResponse = null;
+    try {
+      allocateResponse = scheduler.allocate(allocateRequest);
+    } catch (Exception e) {
+      rePopulateListsOnError(clonedReleaseList, clonedAskList);
+      throw e;
+    }
+    lastResponseID = allocateResponse.getResponseId();
+    availableResources = allocateResponse.getAvailableResources();
+    clusterNmCount = allocateResponse.getNumClusterNodes();
+
+    if (clonedAskList.size() > 0 || clonedReleaseList.size() > 0) {
+      LOG.info("getResources() for " + applicationId + ":" + " ask="
+          + clonedAskList.size() + " release= " + clonedReleaseList.size() 
+          + " newContainers="
+          + allocateResponse.getAllocatedContainers().size()
+          + " finishedContainers="
+          + allocateResponse.getCompletedContainersStatuses().size()
+          + " resourcelimit=" + availableResources + " knownNMs="
+          + clusterNmCount);
+    }
+
+    return allocateResponse;
+  }
+
+  @Override
+  public void handle(RMCommunicatorEvent rawEvent) {
+    switch(rawEvent.getType()) {
+    case CONTAINER_DEALLOCATE:
+      RMCommunicatorEventContainerDeAllocateRequest event = (RMCommunicatorEventContainerDeAllocateRequest) rawEvent;
+      releaseLock.lock();
+      try {
+        numContainerReleaseRequests++;
+        release.add(event.getContainerId());
+      } finally {
+        releaseLock.unlock();
+      }
+      break;
+    default:
+      break;
+    }
+  }
+
+
+  private List<ContainerId> cloneAndClearReleaseList() {
+    ArrayList<ContainerId> clonedReleaseList;
+    releaseLock.lock();
+    try {
+      if (release.size() == 0) {
+        return emptyReleaseList;
+      }
+      clonedReleaseList = new ArrayList<ContainerId>(release);
+      release.clear();
+      return clonedReleaseList;
+    } finally {
+      releaseLock.unlock();
+    }
+  }
+
+  private List<ResourceRequest> cloneAndClearAskList() {
+    ArrayList<ResourceRequest> clonedAskList;
+    askLock.lock();
+    try {
+      if (ask.size() == 0) {
+        return emptyAskList;
+      }
+      clonedAskList = new ArrayList<ResourceRequest>(ask);
+      ask.clear();
+      return clonedAskList;
+    } finally {
+      askLock.unlock();
+    }
+  }
+
+  private void rePopulateListsOnError(List<ContainerId> clonedReleaseList,
+      List<ResourceRequest> clonedAskList) {
+    releaseLock.lock();
+    try {
+      release.addAll(clonedReleaseList);
+    } finally {
+      releaseLock.unlock();
+    }
+    askLock.lock();
+    try {
+      // Asks for a particular ressource could have changed (increased or
+      // decresed) during the failure. Re-pull the list from the
+      // remoteRequestTable. ask being a hashSet and using the same objects
+      // avoids duplicates.
+      rePopulateAskList(clonedAskList);
+    } finally {
+      askLock.unlock();
+    }
+  }
+  
+  private void rePopulateAskList(List<ResourceRequest> clonedAskList) {
+    for (ResourceRequest rr : clonedAskList) {
+      addResourceRequest(rr.getPriority(), rr.getHostName(),
+          rr.getCapability(), 0);
+    }
+  }
+
+  private void logNewContainers(List<Container> newContainers) {
+    if (newContainers.size() > 0) {
+      LOG.info("Got allocated " + newContainers.size() + " containers");
+      for (Container c : newContainers) {
+        LOG.info("AllocatedContainer: " + c);
+      }
+    }
+  }
+  
+  private void logFinishedContainers(List<ContainerStatus> finishedContainers) {
+    if (finishedContainers.size() > 0) {
+      LOG.info(finishedContainers.size() + " containers finished");
+      for (ContainerStatus cs : finishedContainers) {
+        LOG.info("FinihsedContainer: " + cs);
+      }
+    }
+  }
+  
+  private void logUpdatedNodes(List<NodeReport> nodeReports) {
+    if (nodeReports.size() > 0) {
+      LOG.info(nodeReports.size() + " nodes changed state");
+      for (NodeReport nr : nodeReports) {
+        LOG.info("UpdatedNodeReport: " + nr);
+      }
+    }
+  }
+
+  @Private
+  Map<Priority, Map<String, Map<Resource, ResourceRequest>>> getRemoteRequestTable() {
+    return remoteRequestsTable;
+  }
+
+  @Private
+  Set<ResourceRequest> getAskSet() {
+    return ask;
+  }
+
+  @Private
+  Set<ContainerId> getReleaseSet() {
+    return release;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMContainerRequestor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,460 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tez.dag.app.rm.AMRMClient.ContainerRequest;
+import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/* TODO not yet updating cluster nodes on every allocate response
+ * from RMContainerRequestor
+   import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
+    if (clusterNmCount != lastClusterNmCount) {
+      LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
+          + clusterNmCount);
+      eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
+    }
+ */
+public class TaskScheduler extends AbstractService 
+                             implements AMRMClientAsync.CallbackHandler {
+  private static final Log LOG = LogFactory.getLog(TaskScheduler.class);
+  
+  public interface TaskSchedulerAppCallback {
+    public class AppFinalStatus {
+      public final FinalApplicationStatus exitStatus;
+      public final String exitMessage;
+      public final String postCompletionTrackingUrl;
+      public AppFinalStatus(FinalApplicationStatus exitStatus,
+                             String exitMessage,
+                             String posCompletionTrackingUrl) {
+        this.exitStatus = exitStatus;
+        this.exitMessage = exitMessage;
+        this.postCompletionTrackingUrl = posCompletionTrackingUrl;
+      }
+    }
+    // upcall to app must be outside locks
+    public void taskAllocated(Object task, 
+                               Object appCookie, 
+                               Container container);
+    // this may end up being called for a task+container pair that the app
+    // has not heard about. this can happen because of a race between
+    // taskAllocated() upcall and deallocateTask() downcall
+    public void containerCompleted(Object taskLastAllocated, 
+                                    ContainerStatus containerStatus);
+    public void nodesUpdated(List<NodeReport> updatedNodes);
+    public void appRebootRequested();
+    public void setApplicationRegistrationData(
+                                Resource minContainerCapability,
+                                Resource maxContainerCapability,
+                                Map<ApplicationAccessType, String> appAcls
+                                );
+    public void onError(Exception e);
+    public float getProgress();
+    public AppFinalStatus getFinalAppStatus();
+  }
+  
+  final AMRMClientAsync<CRCookie> amRmClient;
+  final TaskSchedulerAppCallback appClient;
+  
+  Map<Object, ContainerRequest<CRCookie>> taskRequests =  
+                  new HashMap<Object, ContainerRequest<CRCookie>>();
+  Map<Object, Container> taskAllocations = 
+                  new HashMap<Object, Container>();
+  Map<ContainerId, Object> containerAssigments = 
+                  new HashMap<ContainerId, Object>();
+  HashMap<ContainerId, Object> releasedContainers = 
+                  new HashMap<ContainerId, Object>();
+  
+  final String appHostName;
+  final int appHostPort;
+  final String appTrackingUrl;
+  
+  class CRCookie {
+    Object task;
+    Object appCookie;
+  }
+  
+  public TaskScheduler(ApplicationAttemptId id, 
+                        TaskSchedulerAppCallback appClient,
+                        String appHostName, 
+                        int appHostPort,
+                        String appTrackingUrl) {
+    super(TaskScheduler.class.getName());
+    this.appClient = appClient;
+    this.amRmClient = new AMRMClientAsync<CRCookie>(id, 1000, this);
+    this.appHostName = appHostName;
+    this.appHostPort = appHostPort;
+    this.appTrackingUrl = appTrackingUrl;
+  }
+  
+  @Private
+  @VisibleForTesting
+  TaskScheduler(ApplicationAttemptId id, 
+      TaskSchedulerAppCallback appClient,
+      String appHostName, 
+      int appHostPort,
+      String appTrackingUrl,
+      AMRMClientAsync<CRCookie> client) {
+    super(TaskScheduler.class.getName());
+    this.appClient = appClient;
+    this.amRmClient = client;
+    this.appHostName = appHostName;
+    this.appHostPort = appHostPort;
+    this.appTrackingUrl = appTrackingUrl;
+  }
+  
+  public Resource getClusterAvailableResources() {
+    return amRmClient.getClusterAvailableResources();
+  }
+  
+  public int getClusterNodeCount() {
+    return amRmClient.getClusterNodeCount();
+  }
+  
+  // AbstractService methods
+  @Override
+  public synchronized void init(Configuration conf) {
+    super.init(conf);
+    amRmClient.init(conf);
+  }
+  
+  @Override
+  public void start() {
+    try {
+      RegisterApplicationMasterResponse response = null;
+      synchronized (this) {
+        amRmClient.start();
+        super.start();
+        response = amRmClient.registerApplicationMaster(appHostName, 
+                                                        appHostPort, 
+                                                        appTrackingUrl);
+      }
+      // upcall to app outside locks
+      appClient.setApplicationRegistrationData(
+                                      response.getMinimumResourceCapability(),
+                                      response.getMaximumResourceCapability(),
+                                      response.getApplicationACLs());
+    } catch (YarnRemoteException e) {
+      LOG.error("Exception while registering", e);
+      throw new YarnException(e);
+    }
+  }
+  
+  @Override
+  public void stop() {
+    // upcall to app outside of locks
+    AppFinalStatus status = appClient.getFinalAppStatus();
+    try {
+      // FIXME make this optional for the reboot case
+      synchronized (this) {
+        amRmClient.unregisterApplicationMaster(status.exitStatus, 
+                                               status.exitMessage,
+                                               status.postCompletionTrackingUrl);
+        amRmClient.stop();
+        super.stop();
+      }
+    } catch (YarnRemoteException e) {
+      LOG.error("Exception while unregistering ", e);
+      throw new YarnException(e);
+    }
+  }
+  
+  // AMRMClientAsync interface methods
+  @Override
+  public void onContainersCompleted(List<ContainerStatus> statuses) {
+    Map<Object, ContainerStatus> appContainerStatus = 
+                        new HashMap<Object, ContainerStatus>(statuses.size());
+    synchronized (this) {
+      for(ContainerStatus containerStatus : statuses) {
+        ContainerId completedId = containerStatus.getContainerId();
+        Object task = releasedContainers.remove(completedId);
+        if(task != null){
+          // TODO later we may want to check if exit code matched expectation
+          // e.g. successful container should not come back fail exit code after
+          // being released
+          // completion of a container we had released earlier
+          // an allocated container completed. notify app
+          LOG.info("Released container completed:" + completedId + 
+                   " last allocated to task: " + task);
+          appContainerStatus.put(task, containerStatus);
+          continue;
+        }
+        
+        // not found in released containers. check currently allocated containers
+        // no need to release this container as the RM has already completed it
+        task = unAssignContainer(completedId, false);
+        if(task != null) {
+          // completion of a container we have allocated currently
+          // an allocated container completed. notify app
+          LOG.info("Allocated container completed:" + completedId + 
+                   " last allocated to task: " + task);
+          appContainerStatus.put(task, containerStatus);
+          continue;
+        }
+        
+        // container neither allocated nor released
+        LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());        
+      }
+    }
+    
+    // upcall to app must be outside locks
+    for(Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
+      appClient.containerCompleted(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void onContainersAllocated(List<Container> containers) {
+    Map<ContainerRequest<CRCookie>, Container> appContainers = 
+                  new HashMap<ContainerRequest<CRCookie>, Container>(containers.size());
+    synchronized (this) {
+      for(Container container : containers) {
+        String location = container.getNodeId().getHost();
+        ContainerRequest<CRCookie> assigned = getMatchingRequest(container, location);
+        if(assigned == null) {
+          location = RackResolver.resolve(location).getNetworkLocation();
+          assigned = getMatchingRequest(container, location);
+        }
+        if(assigned == null) {
+          location = ResourceRequest.ANY;
+          assigned = getMatchingRequest(container, location);
+        }
+        if(assigned == null) {
+          // not matched anything. release container
+          // Probably we cancelled a request and RM allocated that to us 
+          // before RM heard of the cancellation
+          releaseContainer(container.getId(), null);
+          LOG.info("No RM requests matching container: " + container);
+          continue;
+        }
+        
+        Object task = getTask(assigned);
+        assert task != null;
+        assignContainer(task, container, assigned);
+        appContainers.put(assigned, container);
+              
+        LOG.info("Assigning container: " + container + 
+            " for task: " + task + 
+            " at locality: " + location);
+        
+      }
+    }
+    
+    // upcall to app must be outside locks
+    for(Entry<ContainerRequest<CRCookie>, Container> entry : appContainers.entrySet()) {
+      ContainerRequest<CRCookie> assigned = entry.getKey();
+      appClient.taskAllocated(getTask(assigned), getAppCookie(assigned), entry.getValue());
+    }    
+  }
+
+  @Override
+  public void onRebootRequest() {
+    // upcall to app must be outside locks
+    appClient.appRebootRequested();
+  }
+
+  @Override
+  public void onNodesUpdated(List<NodeReport> updatedNodes) {
+    // ignore bad nodes for now
+    // upcall to app must be outside locks
+    appClient.nodesUpdated(updatedNodes);
+  }
+
+  @Override
+  public float getProgress() {
+    return appClient.getProgress();
+  }
+
+  @Override
+  public void onError(Exception e) {
+    appClient.onError(e);
+  }
+  
+  public synchronized void allocateTask(Object task, 
+                                           Resource capability,
+                                           String[] hosts,
+                                           String[] racks,
+                                           Priority priority,
+                                           Object clientCookie) {
+    // TODO check for nulls etc
+    ContainerRequest<CRCookie> request = 
+                              new ContainerRequest<CRCookie>(capability, 
+                                                   hosts, 
+                                                   racks, 
+                                                   priority,
+                                                   1);
+    // TODO extra memory allocation
+    CRCookie cookie = new CRCookie();
+    cookie.task = task;
+    cookie.appCookie = clientCookie;
+    request.setCookie(cookie);
+
+    addTaskRequest(task, request);
+    LOG.info("Allocation request for task: " + task + 
+             " with request: " + request);
+  }
+  
+  public synchronized Container deallocateTask(Object task) {
+    ContainerRequest<CRCookie> request = removeTaskRequest(task);
+    if(request != null) {
+      // task not allocated yet
+      LOG.info("Deallocating task: " + task + " before allocation");
+      return null;
+    }
+    
+    // task request not present. Look in allocations
+    Container container = unAssignContainer(task, true);
+    if(container != null) {
+      LOG.info("Deallocated task: " + task +
+               " from container: " + container.getId());
+      return container;
+    }
+    
+    // task neither requested nor allocated.
+    LOG.info("Ignoring removal of unknown task: " + task);
+    return null;
+  }
+  
+  public synchronized Object deallocateContainer(ContainerId containerId) {
+    Object task = unAssignContainer(containerId, true);
+    if(task != null) {
+      LOG.info("Deallocated container: " + containerId +
+               " from task: " + task);
+      return task;      
+    }
+    
+    LOG.info("Ignoring dealloction of unknown container: " + containerId);
+    return null;
+  }
+  
+  private ContainerRequest<CRCookie> getMatchingRequest(Container container, String location) {
+    Priority priority = container.getPriority();
+    Resource capability = container.getResource();
+    ContainerRequest<CRCookie> assigned = null;
+    Collection<ContainerRequest<CRCookie>> requests =
+        amRmClient.getMatchingRequests(priority, location, capability);
+    
+    if(requests != null) {
+      // TODO maybe do FIFO
+      Iterator<ContainerRequest<CRCookie>> iterator = requests.iterator();
+      if(iterator.hasNext()) {
+        assigned = requests.iterator().next();
+      }
+    }
+    
+    return assigned;
+  }
+  
+  private Object getTask(ContainerRequest<CRCookie> request) {
+    return ((CRCookie)request.getCookie()).task;
+  }
+  
+  private Object getAppCookie(ContainerRequest<CRCookie> request) {
+    return ((CRCookie)request.getCookie()).appCookie;
+  }
+  
+  private void releaseContainer(ContainerId containerId, Object task) {
+    amRmClient.releaseAssignedContainer(containerId);
+    if(task != null) {
+      releasedContainers.put(containerId, task);
+    }
+  }
+  
+  private void assignContainer(Object task, 
+                                Container container, 
+                                ContainerRequest<CRCookie> assigned) {
+    ContainerRequest<CRCookie> request = removeTaskRequest(task);
+    assert request != null;
+    //assert assigned.equals(request);
+
+    Container result = taskAllocations.put(task, container);
+    assert result == null;
+    containerAssigments.put(container.getId(), task);
+    
+  }
+  
+  private ContainerRequest<CRCookie> removeTaskRequest(Object task) {
+    ContainerRequest<CRCookie> request = taskRequests.remove(task);
+    if(request != null) {
+      // remove all references of the request from AMRMClient
+      amRmClient.removeContainerRequest(request);
+    }
+    return request;
+  }
+  
+  private void addTaskRequest(Object task, ContainerRequest<CRCookie> request) {
+    // FIXME duplicates
+    taskRequests.put(task, request);
+    amRmClient.addContainerRequest(request);
+  }
+  
+  private Container unAssignContainer(Object task, boolean releaseIfFound) {
+    Container container = taskAllocations.remove(task);
+    if(container == null) {
+      return null;
+    }
+    containerAssigments.remove(container.getId());
+    if(releaseIfFound) {
+      releaseContainer(container.getId(), task);
+    }
+    return container;
+  }
+  
+  private Object unAssignContainer(ContainerId containerId, 
+                                    boolean releaseIfFound) {
+    Object task = containerAssigments.remove(containerId);
+    if(task == null) {
+      return null;
+    }
+    taskAllocations.remove(task);
+    if(releaseIfFound) {
+      releaseContainer(containerId, task);
+    }
+    return task;
+  }
+
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.client.ClientService;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
+import org.apache.tez.dag.app.rm.container.AMContainerState;
+import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
+import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
+import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
+import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
+
+public class TaskSchedulerEventHandler extends AbstractService
+                                         implements TaskSchedulerAppCallback, 
+                                               EventHandler<AMSchedulerEvent> {
+  static final Log LOG = LogFactory.getLog(TaskSchedulerEventHandler.class);
+
+  protected final AppContext appContext;
+  @SuppressWarnings("rawtypes")
+  private final EventHandler eventHandler;
+  private final TaskScheduler taskScheduler;
+  private DAG job;
+  private Map<ApplicationAccessType, String> appAcls = null;
+  private Thread eventHandlingThread;
+  private volatile boolean stopEventHandling;
+  // Has a signal (SIGTERM etc) been issued?
+  protected volatile boolean isSignalled = false;
+
+  BlockingQueue<AMSchedulerEvent> eventQueue
+                              = new LinkedBlockingQueue<AMSchedulerEvent>();
+
+  public TaskSchedulerEventHandler(AppContext appContext,
+      ClientService clientService) {
+    super(TaskSchedulerEventHandler.class.getName());
+    this.appContext = appContext;
+    eventHandler = appContext.getEventHandler();
+    InetSocketAddress serviceAddr = clientService.getBindAddress();
+    taskScheduler = 
+        new TaskScheduler(appContext.getApplicationAttemptId(),
+                          this,
+                          serviceAddr.getHostName(),
+                          serviceAddr.getPort(),
+                          serviceAddr.getHostName() + 
+                            ":" + clientService.getHttpPort());
+  }
+  
+  public Map<ApplicationAccessType, String> getApplicationAcls() {
+    return appAcls;
+  }
+  
+  public void setSignalled(boolean isSignalled) {
+    this.isSignalled = isSignalled;
+    LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+  }
+  
+  public Resource getAvailableResources() {
+    return taskScheduler.getClusterAvailableResources();
+  }
+  
+  public synchronized void handleEvent(AMSchedulerEvent sEvent) {
+    LOG.info("Processing the event " + sEvent.toString());
+    switch (sEvent.getType()) {
+    case S_TA_LAUNCH_REQUEST:
+      handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
+      break;
+    case S_TA_ENDED: // TaskAttempt considered complete.
+      AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
+      switch(event.getState()) {
+      case FAILED:
+      case KILLED:
+        handleTAUnsuccessfulEnd((AMSchedulerEventTAEnded) sEvent);
+        break;
+      case SUCCEEDED:
+        handleTASucceeded(event);
+        break;
+      default:
+        throw new YarnException("Unexecpted TA_ENDED state: " + event.getState()); 
+      }
+      break;
+    case S_CONTAINER_DEALLOCATE:
+      handleContainerDeallocate((AMSchedulerEventDeallocateContainer)sEvent);
+      break;
+    case S_CONTAINERS_ALLOCATED:
+      break;
+    case S_CONTAINER_COMPLETED:
+    case S_NODE_BLACKLISTED:
+      break;
+    case S_NODE_UNHEALTHY:
+      break;
+    case S_NODE_HEALTHY:
+      // Consider changing this to work like BLACKLISTING.
+      break;
+    }
+  }
+  
+  @Override
+  public void handle(AMSchedulerEvent event) {
+    int qSize = eventQueue.size();
+    if (qSize != 0 && qSize % 1000 == 0) {
+      LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+    }
+    int remCapacity = eventQueue.remainingCapacity();
+    if (remCapacity < 1000) {
+      LOG.warn("Very low remaining capacity in the event-queue "
+          + "of RMContainerAllocator: " + remCapacity);
+    }
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void sendEvent(Event<?> event) {
+    eventHandler.handle(event);
+  }
+  
+  
+  private void handleContainerDeallocate(
+                                  AMSchedulerEventDeallocateContainer event) {
+    ContainerId containerId = event.getContainerId();
+    // TODO what happens to the task that was connected to this container?
+    // current assumption is that it will eventually call handleTaStopRequest
+    //TaskAttempt taskAttempt = (TaskAttempt) 
+    taskScheduler.deallocateContainer(containerId);
+    // TODO does this container need to be stopped via C_STOP_REQUEST
+    sendEvent(new AMContainerEvent(containerId,
+                                   AMContainerEventType.C_STOP_REQUEST));
+  }
+  
+  private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
+    /*MRxTaskAttemptID aId = event.getAttemptID();
+    attemptToLaunchRequestMap.remove(aId);
+    // TODO XXX: This remove may need to be deferred. Possible for a SUCCESSFUL taskAttempt to fail,
+    // which means the scheduler needs to remember taskAttempt to container assignments for a longer time.
+    boolean removed = pendingReduces.remove(aId);
+    if (!removed) {
+      removed = scheduledRequests.remove(aId);
+      if (!removed) {
+        // Maybe assigned.
+        ContainerId containerId = assignedRequests.remove(aId);
+        if (containerId != null) {
+          // Ask the container to stop.
+          sendEvent(new AMContainerEvent(containerId,
+              AMContainerEventType.C_STOP_REQUEST));
+          // Inform the Node - the task has asked to be STOPPED / has already
+          // stopped.
+          sendEvent(new AMNodeEventTaskAttemptEnded(containerMap
+              .get(containerId).getContainer().getNodeId(), containerId,
+              event.getAttemptID(), event.getState() == TaskAttemptState.FAILED));
+        } else {
+          LOG.warn("Received a STOP request for absent taskAttempt: "
+              + event.getAttemptID());
+          // This could be generated in case of recovery, with unhealthy nodes/
+          // fetch failures. Can be ignored, since Recovered containers don't
+          // need to be stopped.
+        }
+      }
+    }*/
+    
+    TaskAttempt attempt = event.getAttempt();
+    Container container = taskScheduler.deallocateTask(attempt);
+    // use stored value of container id in case the scheduler has removed this
+    // assignment because the task has been deallocated earlier. 
+    // retroactive case
+    ContainerId attemptContainerId = attempt.getAssignedContainerID();
+    
+    if(container != null) {
+      // use scheduler container since it exists
+      ContainerId containerId = container.getId();
+      assert attemptContainerId==null || attemptContainerId.equals(containerId);
+      attemptContainerId = containerId;
+    } else {
+      LOG.info("Task: " + attempt.getID() +
+               " has no container assignment in the scheduler");
+    }
+
+    if (attemptContainerId != null) {
+      // TODO either ways send the necessary events 
+      // Ask the container to stop.
+      sendEvent(new AMContainerEvent(attemptContainerId,
+                                     AMContainerEventType.C_STOP_REQUEST));
+      // Inform the Node - the task has asked to be STOPPED / has already
+      // stopped.
+      sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
+          get(attemptContainerId).getContainer().getNodeId(), attemptContainerId,  
+          attempt.getID(), event.getState() == TaskAttemptState.FAILED));
+    }
+  }
+  
+  private void handleTASucceeded(AMSchedulerEventTAEnded event) {
+    /*
+    // TODO XXX Remember the assigned containerId even after task success.
+    // Required for TOO_MANY_FETCH_FAILURES
+    attemptToLaunchRequestMap.remove(event.getAttemptID());
+    ContainerId containerId = assignedRequests.remove(event.getAttemptID());
+    if (containerId != null) { // TODO Should not be null. Confirm.
+      sendEvent(new AMContainerTASucceededEvent(containerId,
+          event.getAttemptID()));
+      sendEvent(new AMNodeEventTaskAttemptSucceeded(containerMap
+          .get(containerId).getContainer().getNodeId(), containerId,
+          event.getAttemptID()));
+      containerAvailable(containerId);
+    } else {
+      LOG.warn("Received TaskAttemptSucceededEvent for unmapped TaskAttempt: "
+          + event.getAttemptID() + ". Full event: " + event);
+    }*/
+    
+    TaskAttempt attempt = event.getAttempt();
+    Container container = taskScheduler.deallocateTask(attempt);
+    if(container != null) {
+      ContainerId containerId = container.getId();
+      assert containerId.equals(event.getUsedContainerId());
+      sendEvent(new AMContainerEventTASucceeded(containerId,
+                    event.getAttemptID()));
+      // Inform the Node - the task has asked to be STOPPED / has already
+      // stopped.
+      sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers().
+          get(containerId).getContainer().getNodeId(), containerId,
+          event.getAttemptID()));
+      // TODO this is where reuse will happen
+      sendEvent(new AMContainerEvent(containerId,
+          AMContainerEventType.C_STOP_REQUEST));
+    }
+  }
+
+  /*
+  @SuppressWarnings("unchecked")
+  private int maybeComputeNormalizedRequestForType(
+      AMSchedulerTALaunchRequestEvent event, int prevComputedSize) {
+    if (prevComputedSize == 0) {
+      int supportedMaxContainerCapability = appContext.getClusterInfo()
+          .getMaxContainerCapability().getMemory();
+      prevComputedSize = event.getCapability().getMemory();
+      int minSlotMemSize = appContext.getClusterInfo()
+          .getMinContainerCapability().getMemory();
+      prevComputedSize = (int) Math.ceil((float) prevComputedSize
+          / minSlotMemSize)
+          * minSlotMemSize;
+      if (prevComputedSize > supportedMaxContainerCapability) {
+        String diagMsg = " capability required is more than the supported "
+            + "max container capability in the cluster. Killing the Job. "
+            + "ResourceReqt: " + prevComputedSize
+            + " maxContainerCapability:" + supportedMaxContainerCapability;
+        LOG.info(diagMsg);
+        eventHandler.handle(new DAGDiagnosticsUpdateEvent(job.getID(), diagMsg));
+        eventHandler.handle(new DAGEvent(job.getID(), DAGEventType.JOB_KILL));
+      }
+    }
+    return prevComputedSize;
+  }
+  */
+
+  private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
+    /**
+         // Add to queue of pending tasks.
+    recalculateReduceSchedule = true;
+    attemptToLaunchRequestMap.put(event.getAttemptID(), event);
+    if (event.getAttemptID().getTaskID().getTaskType() == TaskType.MAP) {
+      mapResourceReqt = maybeComputeNormalizedRequestForType(event,
+          TaskType.MAP, mapResourceReqt);
+      event.getCapability().setMemory(mapResourceReqt);
+      scheduledRequests.addMap(event);
+    } else { // Reduce
+      reduceResourceReqt = maybeComputeNormalizedRequestForType(event,
+          TaskType.REDUCE, reduceResourceReqt);
+      event.getCapability().setMemory(reduceResourceReqt);
+      if (event.isRescheduled()) {
+        pendingReduces.addFirst(new ContainerRequestInfo(new ContainerRequest(
+            event.getCapability(), event.getHosts(), event.getRacks(),
+            PRIORITY_REDUCE), event));
+      } else {
+        pendingReduces.addLast(new ContainerRequestInfo(new ContainerRequest(
+            event.getCapability(), event.getHosts(), event.getRacks(),
+            PRIORITY_REDUCE), event));
+      }
+    }
+     */
+    // TODO resource adjustment needs to move into dag
+    /*Resource mapResourceReqt = maybeComputeNormalizedRequestForType(event,
+        TaskType.MAP, mapResourceReqt);
+    event.getCapability().setMemory(mapResourceReqt);*/
+    TaskAttempt taskAttempt = event.getTaskAttempt();
+    taskScheduler.allocateTask(taskAttempt, 
+                               event.getCapability(), 
+                               event.getHosts(), 
+                               event.getRacks(), 
+                               event.getPriority(),
+                               event);
+  }
+
+  // AbstractService methods
+  @Override
+  public synchronized void init(Configuration conf) {
+    super.init(conf);
+    taskScheduler.init(conf);
+    // todo set heartbeat value from conf here
+  }
+  
+  @Override
+  public synchronized void start() {
+    // FIXME hack alert how is this supposed to support multiple DAGs?
+    // Answer: this is shared across dags. need job==app-dag-master
+    job = appContext.getDAG();
+    taskScheduler.start();
+    this.eventHandlingThread = new Thread() {
+      @Override
+      public void run() {
+
+        AMSchedulerEvent event;
+
+        while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          try {
+            event = TaskSchedulerEventHandler.this.eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.error("Returning, interrupted : " + e);
+            continue;
+          }
+
+          try {
+            handleEvent(event);
+          } catch (Throwable t) {
+            LOG.error("Error in handling event type " + event.getType()
+                + " to the TaskScheduler", t);
+            // Kill the AM.
+            sendEvent(new DAGEvent(job.getID(), DAGEventType.INTERNAL_ERROR));
+            return;
+          }
+        }
+      }
+    };
+    this.eventHandlingThread.start();
+    super.start();
+  }
+  
+  @Override
+  public synchronized void stop() {
+    this.stopEventHandling = true;
+    if (eventHandlingThread != null)
+      eventHandlingThread.interrupt();
+    taskScheduler.stop();
+    super.stop();
+  }
+  
+  // TaskSchedulerAppCallback methods
+  @Override
+  public synchronized void taskAllocated(Object task, 
+                                           Object appCookie, 
+                                           Container container) {
+    /*
+    availableContainerIds.addAll(event.getContainerIds());
+
+    completedMaps = getJob().getCompletedMaps();
+    completedReduces = getJob().getCompletedReduces();
+    int completedTasks = completedMaps + completedReduces;
+
+    if (lastCompletedTasks != completedTasks) {
+      recalculateReduceSchedule = true;
+      lastCompletedTasks = completedTasks;
+    }
+
+    if (event.didHeadroomChange() || event.getContainerIds().size() > 0) {
+      recalculateReduceSchedule = true;
+    }
+    schedule();
+    .....
+          // Update resource requests
+      requestor.decContainerReq(assigned.getContainerRequest());
+  
+      // TODO Maybe: ApplicationACLs should be populated into the appContext from the RMCommunicator.
+      ContainerId containerId = allocated.getId();
+      if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
+        AMSchedulerTALaunchRequestEvent tlrEvent = attemptToLaunchRequestMap
+            .get(assigned.getAttemptId());
+        JobConf jobConf = new JobConf(getJob().getConf());
+  
+        AMContainerEventLaunchRequest launchRequest = new AMContainerEventLaunchRequest(
+            containerId, jobId, assigned.getAttemptId().getTaskId()
+                .getTaskType(), tlrEvent.getJobToken(),
+            tlrEvent.getCredentials(), shouldProfileTaskAttempt(
+                jobConf, tlrEvent.getRemoteTaskContext()), jobConf);
+  
+        eventHandler.handle(launchRequest);
+      }
+      eventHandler.handle(new AMContainerEventAssignTA(containerId,
+          assigned.getAttemptId(), attemptToLaunchRequestMap.get(
+              assigned.getAttemptId()).getRemoteTaskContext()));
+  
+      assignedRequests.add(allocated, assigned.getAttemptId());
+    */
+    
+    ContainerId containerId = container.getId();
+    appContext.getAllContainers().addContainerIfNew(container);
+    appContext.getAllNodes().nodeSeen(container.getNodeId());   
+    sendEvent(new AMNodeEventContainerAllocated(container
+        .getNodeId(), container.getId()));
+    
+    AMSchedulerEventTALaunchRequest event = 
+                         (AMSchedulerEventTALaunchRequest) appCookie;
+    TaskAttempt taskAttempt = event.getTaskAttempt();
+    // TODO - perhaps check if the task still needs this container
+    // because the deallocateTask downcall may have raced with the 
+    // taskAllocated() upcall
+    assert task.equals(taskAttempt);
+    if (appContext.getAllContainers().get(containerId).getState() 
+        == AMContainerState.ALLOCATED) {
+
+      sendEvent(new AMContainerEventLaunchRequest(
+          containerId,
+          taskAttempt.getID().getTaskID().getVertexID(),
+          event.getJobToken(),
+          event.getCredentials(), false, job.getConf(),
+          taskAttempt.getLocalResources(),
+          taskAttempt.getEnvironment()));
+    }
+    sendEvent(new AMContainerEventAssignTA(containerId,
+        taskAttempt.getID(), event.getRemoteTaskContext()));
+  }
+
+  @Override
+  public synchronized void containerCompleted(Object task, ContainerStatus containerStatus) {
+    // Inform the Containers about completion.
+    sendEvent(new AMContainerEventCompleted(containerStatus));
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized void nodesUpdated(List<NodeReport> updatedNodes) {
+    for (NodeReport nr : updatedNodes) {
+      // Scheduler will find out from the node, if at all.
+      // Relying on the RM to not allocate containers on an unhealthy node.
+      eventHandler.handle(new AMNodeEventStateChanged(nr));
+    }
+  }
+
+  @Override
+  public synchronized void appRebootRequested() {
+    // This can happen if the RM has been restarted. If it is in that state,
+    // this application must clean itself up.
+    // TODO change event to REBOOT
+    // FIXME appReboot != dagReboot
+    // TODO handle multiple dags - Answer: this is shared across dags
+    sendEvent(new DAGEvent(appContext.getDAGID(),
+                           DAGEventType.INTERNAL_ERROR));
+    throw new YarnException("ResourceManager requests reboot for: "
+                             + appContext.getApplicationID());  
+  }
+
+  @Override
+  public synchronized void setApplicationRegistrationData(Resource minContainerCapability,
+      Resource maxContainerCapability,
+      Map<ApplicationAccessType, String> appAcls) {
+    this.appContext.getClusterInfo().setMinContainerCapability(
+        minContainerCapability);
+    this.appContext.getClusterInfo().setMaxContainerCapability(
+        maxContainerCapability);
+    this.appAcls = appAcls;
+  }
+
+  @Override
+  public synchronized AppFinalStatus getFinalAppStatus() {
+    FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+    StringBuffer sb = new StringBuffer();
+    if (job == null) {
+      finishState = FinalApplicationStatus.FAILED;
+      sb.append("Job failed to initialize");
+    } else {
+      if (job.getState() == DAGState.SUCCEEDED) {
+      finishState = FinalApplicationStatus.SUCCEEDED;
+      } else if (job.getState() == DAGState.KILLED
+          || (job.getState() == DAGState.RUNNING && isSignalled)) {
+        finishState = FinalApplicationStatus.KILLED;
+      } else if (job.getState() == DAGState.FAILED
+          || job.getState() == DAGState.ERROR) {
+        finishState = FinalApplicationStatus.FAILED;
+      }
+      for (String s : job.getDiagnostics()) {
+        sb.append(s).append("\n");
+      }
+    }
+    LOG.info("Setting job diagnostics to " + sb.toString());
+
+    String historyUrl = "";
+    /*String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
+        appContext.getApplicationID());
+    LOG.info("History url is " + historyUrl);*/
+    
+    return new AppFinalStatus(finishState, sb.toString(), historyUrl);
+  }
+
+  @Override
+  public synchronized float getProgress() {
+    return job.getProgress();
+  }
+
+  @Override
+  public void onError(Exception e) {
+    // TODO Possibly wait for some time and then stop
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public interface AMContainer extends EventHandler<AMContainerEvent>{
+  
+  public AMContainerState getState();
+  public ContainerId getContainerId();
+  public Container getContainer();
+  //TODO Rename - CompletedTaskAttempts, ideally means FAILED / KILLED as well.
+  public List<TezTaskAttemptID> getCompletedTaskAttempts();
+  public TezTaskAttemptID getRunningTaskAttempt();
+  public List<TezTaskAttemptID> getQueuedTaskAttempts();
+  
+  public int getShufflePort();
+  
+  // TODO Add a method to get the containers capabilities - to match taskAttempts.
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class AMContainerEvent extends AbstractEvent<AMContainerEventType> {
+
+  private final ContainerId containerId;
+  
+  public AMContainerEvent(ContainerId containerId, AMContainerEventType type) {
+    super(type);
+    this.containerId = containerId;
+  }
+  
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class AMContainerEventAssignTA extends AMContainerEvent {
+
+  private final TezTaskAttemptID attemptId;
+  // TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ?
+  private final TezTask remoteTaskContext;
+  
+  public AMContainerEventAssignTA(ContainerId containerId,
+      TezTaskAttemptID attemptId, Object remoteTaskContext) {
+    super(containerId, AMContainerEventType.C_ASSIGN_TA);
+    this.attemptId = attemptId;
+    this.remoteTaskContext = (TezTask)remoteTaskContext;
+  }
+  
+  public TezTask getRemoteTaskContext() {
+    return this.remoteTaskContext;
+  }
+  
+  public TezTaskAttemptID getTaskAttemptId() {
+    return this.attemptId;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+public class AMContainerEventCompleted extends AMContainerEvent {
+
+  private final ContainerStatus containerStatus;
+
+  public AMContainerEventCompleted(ContainerStatus containerStatus) {
+    super(containerStatus.getContainerId(), AMContainerEventType.C_COMPLETED);
+    this.containerStatus = containerStatus;
+  }
+
+  public ContainerStatus getContainerStatus() {
+    return this.containerStatus;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchFailed.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchFailed.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchFailed.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventLaunchFailed extends AMContainerEvent {
+
+  private final String message;
+  
+  public AMContainerEventLaunchFailed(ContainerId containerId,
+      String message) {
+    super(containerId, AMContainerEventType.C_LAUNCH_FAILED);
+    this.message = message;
+  }
+  
+  public String getMessage() {
+    return this.message;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchFailed.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezVertexID;
+
+public class AMContainerEventLaunchRequest extends AMContainerEvent {
+
+  private final TezVertexID vertexId;
+  private final Token<JobTokenIdentifier> jobToken;
+  private final Credentials credentials;
+  private final boolean shouldProfile;
+  private final Configuration conf;
+  private final Map<String, LocalResource> localResources;
+  private final Map<String, String> environment;
+
+  public AMContainerEventLaunchRequest(ContainerId containerId, 
+      TezVertexID vertexId,
+      Token<JobTokenIdentifier> jobToken,
+      Credentials credentials, boolean shouldProfile, Configuration conf,
+      Map<String, LocalResource> localResources,
+      Map<String, String> environment) {
+    super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
+    this.vertexId = vertexId;
+    this.jobToken = jobToken;
+    this.credentials = credentials;
+    this.shouldProfile = shouldProfile;
+    this.conf = conf;
+    this.localResources = localResources;
+    this.environment = environment;
+  }
+
+  public TezDAGID getDAGId() {
+    return this.vertexId.getDAGId();
+  }
+
+  public TezVertexID getVertexId() {
+    return this.vertexId;
+  }
+  
+  public Token<JobTokenIdentifier> getJobToken() {
+    return this.jobToken;
+  }
+
+  public Credentials getCredentials() {
+    return this.credentials;
+  }
+
+  public boolean shouldProfile() {
+    return this.shouldProfile;
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  public Map<String, LocalResource> getLocalResources() {
+    return localResources;
+  }
+
+  public Map<String, String> getEnvironment() {
+    return environment;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunched.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunched.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunched.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunched.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventLaunched extends AMContainerEvent {
+
+  private final int shufflePort;
+
+  public AMContainerEventLaunched(ContainerId containerId, int shufflePort) {
+    super(containerId, AMContainerEventType.C_LAUNCHED);
+    this.shufflePort = shufflePort;
+  }
+
+  public int getShufflePort() {
+    return this.shufflePort;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunched.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventNodeFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventNodeFailed.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventNodeFailed.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventNodeFailed.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
+
+public class AMContainerEventNodeFailed extends AMContainerEvent implements
+    DiagnosableEvent {
+
+  private final String message;
+
+  public AMContainerEventNodeFailed(ContainerId containerId, String message) {
+    super(containerId, AMContainerEventType.C_NODE_FAILED);
+    this.message = message;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return message;
+  }
+
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventNodeFailed.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventStopFailed.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventStopFailed.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventStopFailed.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventStopFailed.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMContainerEventStopFailed extends AMContainerEvent {
+
+  // TODO XXX Not being used for anything. May be useful if we rely less on
+  // the RM informing the job about container failure.
+  
+  private final String message;
+
+  public AMContainerEventStopFailed(ContainerId containerId, String message) {
+    super(containerId, AMContainerEventType.C_NM_STOP_FAILED);
+    this.message = message;
+  }
+
+  public String getMessage() {
+    return this.message;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventStopFailed.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventTASucceeded.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventTASucceeded.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventTASucceeded.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventTASucceeded.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class AMContainerEventTASucceeded extends AMContainerEvent {
+
+  private final TezTaskAttemptID attemptId;
+
+  public AMContainerEventTASucceeded(ContainerId containerId,
+      TezTaskAttemptID attemptId) {
+    super(containerId, AMContainerEventType.C_TA_SUCCEEDED);
+    this.attemptId = attemptId;
+  }
+  
+  public TezTaskAttemptID getTaskAttemptId() {
+    return this.attemptId;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventTASucceeded.java
------------------------------------------------------------------------------
    svn:eol-style = native