You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/23 00:48:21 UTC

svn commit: r1470743 - in /incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce: ClientCache.java ClientServiceDelegate.java DAGJobStatus.java YARNRunner.java

Author: hitesh
Date: Mon Apr 22 22:48:21 2013
New Revision: 1470743

URL: http://svn.apache.org/r1470743
Log:
YARN-52. Fix YARNRunner to have jobStatus so that it does not throw exception after job success. (hitesh)

Added:
    incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java   (with props)
Modified:
    incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
    incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
    incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java

Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java?rev=1470743&r1=1470742&r2=1470743&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java Mon Apr 22 22:48:21 2013
@@ -18,36 +18,20 @@
 
 package org.apache.tez.mapreduce;
 
-import java.io.IOException;
-import java.security.PrivilegedAction;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 
 public class ClientCache {
 
   private final Configuration conf;
   private final ResourceMgrDelegate rm;
 
-  private static final Log LOG = LogFactory.getLog(ClientCache.class);
-
   private Map<JobID, ClientServiceDelegate> cache = 
       new HashMap<JobID, ClientServiceDelegate>();
 
-  private MRClientProtocol hsProxy;
-
   public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
     this.conf = conf;
     this.rm = rm;
@@ -55,46 +39,12 @@ public class ClientCache {
 
   //TODO: evict from the cache on some threshold
   public synchronized ClientServiceDelegate getClient(JobID jobId) {
-    if (hsProxy == null) {
-      try {
-        hsProxy = instantiateHistoryProxy();
-      } catch (IOException e) {
-        LOG.warn("Could not connect to History server.", e);
-        throw new YarnException("Could not connect to History server.", e);
-      }
-    }
     ClientServiceDelegate client = cache.get(jobId);
     if (client == null) {
-      client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
+      client = new ClientServiceDelegate(conf, rm, jobId);
       cache.put(jobId, client);
     }
     return client;
   }
 
-  protected synchronized MRClientProtocol getInitializedHSProxy()
-      throws IOException {
-    if (this.hsProxy == null) {
-      hsProxy = instantiateHistoryProxy();
-    }
-    return this.hsProxy;
-  }
-  
-  protected MRClientProtocol instantiateHistoryProxy()
-      throws IOException {
-    final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
-    if (StringUtils.isEmpty(serviceAddr)) {
-      return null;
-    }
-    LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
-    final YarnRPC rpc = YarnRPC.create(conf);
-    LOG.debug("Connected to HistoryServer at: " + serviceAddr);
-    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-    return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
-      @Override
-      public MRClientProtocol run() {
-        return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
-            NetUtils.createSocketAddr(serviceAddr), conf);
-      }
-    });
-  }
 }

Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java?rev=1470743&r1=1470742&r2=1470743&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java Mon Apr 22 22:48:21 2013
@@ -18,68 +18,27 @@
 
 package org.apache.tez.mapreduce;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.*;
-import org.apache.hadoop.mapreduce.v2.api.records.*;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ClientToken;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
 
 public class ClientServiceDelegate {
-  private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
-  private static final String UNAVAILABLE = "N/A";
-
-  // Caches for per-user NotRunningJobs
-  private HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs;
 
   private final Configuration conf;
-  private final JobID jobId;
-  private final ApplicationId appId;
   private final ResourceMgrDelegate rm;
-  private final MRClientProtocol historyServerProxy;
-  private MRClientProtocol realProxy = null;
-  private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  private static String UNKNOWN_USER = "Unknown User";
-  private String trackingUrl;
 
-  private boolean amAclDisabledStatusLogged = false;
+  // FIXME
+  // how to handle completed jobs that the RM does not know about?
 
   public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
-      JobID jobId, MRClientProtocol historyServerProxy) {
+      JobID jobId) {
     this.conf = new Configuration(conf); // Cloning for modifying.
     // For faster redirects from AM to HS.
     this.conf.setInt(
@@ -87,387 +46,68 @@ public class ClientServiceDelegate {
         this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
             MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
     this.rm = rm;
-    this.jobId = jobId;
-    this.historyServerProxy = historyServerProxy;
-    this.appId = TypeConverter.toYarn(jobId).getAppId();
-    notRunningJobs = new HashMap<JobState, HashMap<String, NotRunningJob>>();
-  }
-
-  // Get the instance of the NotRunningJob corresponding to the specified
-  // user and state
-  private NotRunningJob getNotRunningJob(ApplicationReport applicationReport,
-      JobState state) {
-    synchronized (notRunningJobs) {
-      HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
-      if (map == null) {
-        map = new HashMap<String, NotRunningJob>();
-        notRunningJobs.put(state, map);
-      }
-      String user =
-          (applicationReport == null) ?
-              UNKNOWN_USER : applicationReport.getUser();
-      NotRunningJob notRunningJob = map.get(user);
-      if (notRunningJob == null) {
-        notRunningJob = new NotRunningJob(applicationReport, state);
-        map.put(user, notRunningJob);
-      }
-      return notRunningJob;
-    }
-  }
-
-  private MRClientProtocol getProxy() throws YarnRemoteException {
-    if (realProxy != null) {
-      return realProxy;
-    }
-    
-    // Possibly allow nulls through the PB tunnel, otherwise deal with an exception
-    // and redirect to the history server.
-    ApplicationReport application = rm.getApplicationReport(appId);
-    if (application != null) {
-      trackingUrl = application.getTrackingUrl();
-    }
-    InetSocketAddress serviceAddr = null;
-    while (application == null
-        || YarnApplicationState.RUNNING == application
-            .getYarnApplicationState()) {
-      if (application == null) {
-        LOG.info("Could not get Job info from RM for job " + jobId
-            + ". Redirecting to job history server.");
-        return checkAndGetHSProxy(null, JobState.NEW);
-      }
-      try {
-        if (application.getHost() == null || "".equals(application.getHost())) {
-          LOG.debug("AM not assigned to Job. Waiting to get the AM ...");
-          Thread.sleep(2000);
-
-          LOG.debug("Application state is " + application.getYarnApplicationState());
-          application = rm.getApplicationReport(appId);
-          continue;
-        } else if (UNAVAILABLE.equals(application.getHost())) {
-          if (!amAclDisabledStatusLogged) {
-            LOG.info("Job " + jobId + " is running, but the host is unknown."
-                + " Verify user has VIEW_JOB access.");
-            amAclDisabledStatusLogged = true;
-          }
-          return getNotRunningJob(application, JobState.RUNNING);
-        }
-        if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
-          UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
-              UserGroupInformation.getCurrentUser().getUserName());
-          serviceAddr = NetUtils.createSocketAddrForHost(
-              application.getHost(), application.getRpcPort());
-          if (UserGroupInformation.isSecurityEnabled()) {
-            ClientToken clientToken = application.getClientToken();
-            Token<ClientTokenIdentifier> token =
-                ProtoUtils.convertFromProtoFormat(clientToken, serviceAddr);
-            newUgi.addToken(token);
-          }
-          LOG.debug("Connecting to " + serviceAddr);
-          final InetSocketAddress finalServiceAddr = serviceAddr;
-          realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
-            @Override
-            public MRClientProtocol run() throws IOException {
-              return instantiateAMProxy(finalServiceAddr);
-            }
-          });
-        } else {
-          if (!amAclDisabledStatusLogged) {
-            LOG.info("Network ACL closed to AM for job " + jobId
-                + ". Not going to try to reach the AM.");
-            amAclDisabledStatusLogged = true;
-          }
-          return getNotRunningJob(null, JobState.RUNNING);
-        }
-        return realProxy;
-      } catch (IOException e) {
-        //possibly the AM has crashed
-        //there may be some time before AM is restarted
-        //keep retrying by getting the address from RM
-        LOG.info("Could not connect to " + serviceAddr +
-        ". Waiting for getting the latest AM address...");
-        try {
-          Thread.sleep(2000);
-        } catch (InterruptedException e1) {
-          LOG.warn("getProxy() call interruped", e1);
-          throw new YarnException(e1);
-        }
-        application = rm.getApplicationReport(appId);
-        if (application == null) {
-          LOG.info("Could not get Job info from RM for job " + jobId
-              + ". Redirecting to job history server.");
-          return checkAndGetHSProxy(null, JobState.RUNNING);
-        }
-      } catch (InterruptedException e) {
-        LOG.warn("getProxy() call interruped", e);
-        throw new YarnException(e);
-      }
-    }
-
-    /** we just want to return if its allocating, so that we don't
-     * block on it. This is to be able to return job status
-     * on an allocating Application.
-     */
-    String user = application.getUser();
-    if (user == null) {
-      throw RPCUtil.getRemoteException("User is not set in the application report");
-    }
-    if (application.getYarnApplicationState() == YarnApplicationState.NEW
-        || application.getYarnApplicationState() == YarnApplicationState.SUBMITTED
-        || application.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
-      realProxy = null;
-      return getNotRunningJob(application, JobState.NEW);
-    }
-
-    if (application.getYarnApplicationState() == YarnApplicationState.FAILED) {
-      realProxy = null;
-      return getNotRunningJob(application, JobState.FAILED);
-    }
-
-    if (application.getYarnApplicationState() == YarnApplicationState.KILLED) {
-      realProxy = null;
-      return getNotRunningJob(application, JobState.KILLED);
-    }
-
-    //History server can serve a job only if application
-    //succeeded.
-    if (application.getYarnApplicationState() == YarnApplicationState.FINISHED) {
-      LOG.info("Application state is completed. FinalApplicationStatus="
-          + application.getFinalApplicationStatus().toString()
-          + ". Redirecting to job history server");
-      realProxy = checkAndGetHSProxy(application, JobState.SUCCEEDED);
-    }
-    return realProxy;
-  }
-
-  private MRClientProtocol checkAndGetHSProxy(
-      ApplicationReport applicationReport, JobState state) {
-    if (null == historyServerProxy) {
-      LOG.warn("Job History Server is not configured.");
-      return getNotRunningJob(applicationReport, state);
-    }
-    return historyServerProxy;
-  }
-
-  MRClientProtocol instantiateAMProxy(final InetSocketAddress serviceAddr)
-      throws IOException {
-    LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
-    YarnRPC rpc = YarnRPC.create(conf);
-    MRClientProtocol proxy = 
-         (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
-            serviceAddr, conf);
-    LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
-    return proxy;
   }
 
-  private synchronized Object invoke(String method, Class argClass,
-      Object args) throws IOException {
-    Method methodOb = null;
-    try {
-      methodOb = MRClientProtocol.class.getMethod(method, argClass);
-    } catch (SecurityException e) {
-      throw new YarnException(e);
-    } catch (NoSuchMethodException e) {
-      throw new YarnException("Method name mismatch", e);
-    }
-    int maxRetries = this.conf.getInt(
-        MRJobConfig.MR_CLIENT_MAX_RETRIES,
-        MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
-    IOException lastException = null;
-    while (maxRetries > 0) {
-      try {
-        return methodOb.invoke(getProxy(), args);
-      } catch (YarnRemoteException yre) {
-        LOG.warn("Exception thrown by remote end.", yre);
-        throw yre;
-      } catch (InvocationTargetException e) {
-        if (e.getTargetException() instanceof YarnRemoteException) {
-          LOG.warn("Error from remote end: " + e
-              .getTargetException().getLocalizedMessage());
-          LOG.debug("Tracing remote error ", e.getTargetException());
-          throw (YarnRemoteException) e.getTargetException();
-        }
-        LOG.debug("Failed to contact AM/History for job " + jobId + 
-            " retrying..", e.getTargetException());
-        // Force reconnection by setting the proxy to null.
-        realProxy = null;
-        // HS/AMS shut down
-        maxRetries--;
-        lastException = new IOException(e.getMessage());
-        
-      } catch (Exception e) {
-        LOG.debug("Failed to contact AM/History for job " + jobId
-            + "  Will retry..", e);
-        // Force reconnection by setting the proxy to null.
-        realProxy = null;
-        // RM shutdown
-        maxRetries--;
-        lastException = new IOException(e.getMessage());     
-      }
-    }
-    throw lastException;
-  }
-
-  public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
-  InterruptedException {
-    org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
-      GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
-      request.setJobId(jobID);
-      Counters cnt = ((GetCountersResponse)
-          invoke("getCounters", GetCountersRequest.class, request)).getCounters();
-      return TypeConverter.fromYarn(cnt);
-
+  public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID jobId)
+      throws IOException, InterruptedException {
+    // FIXME needs counters support from DAG
+    // with a translation layer on client side
+    org.apache.hadoop.mapreduce.Counters empty =
+        new org.apache.hadoop.mapreduce.Counters();
+    return empty;
   }
 
-  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobId,
+      int fromEventId, int maxEvents)
       throws IOException, InterruptedException {
-    org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
-        .toYarn(arg0);
-    GetTaskAttemptCompletionEventsRequest request = recordFactory
-        .newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
-    request.setJobId(jobID);
-    request.setFromEventId(arg1);
-    request.setMaxEvents(arg2);
-    List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list =
-      ((GetTaskAttemptCompletionEventsResponse) invoke(
-        "getTaskAttemptCompletionEvents", GetTaskAttemptCompletionEventsRequest.class, request)).
-        getCompletionEventList();
-    return TypeConverter
-        .fromYarn(list
-            .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
+    // FIXME seems like there is support in client to query task failure
+    // related information
+    // However, api does not make sense for DAG
+    return new TaskCompletionEvent[0];
   }
 
-  public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
+  public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
+      taId)
       throws IOException, InterruptedException {
-
-    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
-        .toYarn(arg0);
-    GetDiagnosticsRequest request = recordFactory
-        .newRecordInstance(GetDiagnosticsRequest.class);
-    request.setTaskAttemptId(attemptID);
-    List<String> list = ((GetDiagnosticsResponse) invoke("getDiagnostics",
-        GetDiagnosticsRequest.class, request)).getDiagnosticsList();
-    String[] result = new String[list.size()];
-    int i = 0;
-    for (String c : list) {
-      result[i++] = c.toString();
-    }
-    return result;
+    // FIXME need support to query task diagnostics?
+    return new String[0];
   }
   
   public JobStatus getJobStatus(JobID oldJobID) throws IOException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
       TypeConverter.toYarn(oldJobID);
-    GetJobReportRequest request =
-        recordFactory.newRecordInstance(GetJobReportRequest.class);
-    request.setJobId(jobId);
-    JobReport report = ((GetJobReportResponse) invoke("getJobReport",
-        GetJobReportRequest.class, request)).getJobReport();
-    JobStatus jobStatus = null;
-    if (report != null) {
-      if (StringUtils.isEmpty(report.getJobFile())) {
-        String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
-        report.setJobFile(jobFile);
-      }
-      String historyTrackingUrl = report.getTrackingUrl();
-      String url = StringUtils.isNotEmpty(historyTrackingUrl)
-          ? historyTrackingUrl : trackingUrl;
-      if (!UNAVAILABLE.equals(url)) {
-        url = HttpConfig.getSchemePrefix() + url;
-      }
-      jobStatus = TypeConverter.fromYarn(report, url);
-    }
+    ApplicationReport appReport =
+        rm.getApplicationReport(jobId.getAppId());
+    JobStatus jobStatus =
+        new DAGJobStatus(appReport);
     return jobStatus;
   }
 
-  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
+  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
+      JobID oldJobID, TaskType taskType)
        throws IOException{
-    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
-      TypeConverter.toYarn(oldJobID);
-    GetTaskReportsRequest request =
-        recordFactory.newRecordInstance(GetTaskReportsRequest.class);
-    request.setJobId(jobId);
-    request.setTaskType(TypeConverter.toYarn(taskType));
-
-    List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports =
-      ((GetTaskReportsResponse) invoke("getTaskReports", GetTaskReportsRequest.class,
-          request)).getTaskReportList();
-
-    return TypeConverter.fromYarn
-    (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
+    // FIXME need support to query task reports?
+    throw new UnsupportedOperationException();
   }
 
   public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
        throws IOException {
-    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
-      = TypeConverter.toYarn(taskAttemptID);
-    if (fail) {
-      FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
-      failRequest.setTaskAttemptId(attemptID);
-      invoke("failTaskAttempt", FailTaskAttemptRequest.class, failRequest);
-    } else {
-      KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
-      killRequest.setTaskAttemptId(attemptID);
-      invoke("killTaskAttempt", KillTaskAttemptRequest.class, killRequest);
-    }
-    return true;
+    // FIXME need support to kill a task attempt?
+    throw new UnsupportedOperationException();
   }
 
   public boolean killJob(JobID oldJobID)
        throws IOException {
-    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
-    = TypeConverter.toYarn(oldJobID);
-    KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
-    killRequest.setJobId(jobId);
-    invoke("killJob", KillJobRequest.class, killRequest);
-    return true;
+    // FIXME need support to kill a dag?
+    // Should this be just an RM killApplication?
+    // For one dag per AM, RM kill should suffice
+    throw new UnsupportedOperationException();
   }
 
-  public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
+  public LogParams getLogFilePath(JobID oldJobID,
+      TaskAttemptID oldTaskAttemptID)
       throws YarnRemoteException, IOException {
-    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
-        TypeConverter.toYarn(oldJobID);
-    GetJobReportRequest request =
-        recordFactory.newRecordInstance(GetJobReportRequest.class);
-    request.setJobId(jobId);
-
-    JobReport report =
-        ((GetJobReportResponse) invoke("getJobReport",
-            GetJobReportRequest.class, request)).getJobReport();
-    if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
-        JobState.ERROR).contains(report.getJobState())) {
-      if (oldTaskAttemptID != null) {
-        GetTaskAttemptReportRequest taRequest =
-            recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
-        taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
-        TaskAttemptReport taReport =
-            ((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
-                GetTaskAttemptReportRequest.class, taRequest))
-                .getTaskAttemptReport();
-        if (taReport.getContainerId() == null
-            || taReport.getNodeManagerHost() == null) {
-          throw new IOException("Unable to get log information for task: "
-              + oldTaskAttemptID);
-        }
-        return new LogParams(
-            taReport.getContainerId().toString(),
-            taReport.getContainerId().getApplicationAttemptId()
-                .getApplicationId().toString(),
-            BuilderUtils.newNodeId(taReport.getNodeManagerHost(),
-                taReport.getNodeManagerPort()).toString(), report.getUser());
-      } else {
-        if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
-          throw new IOException("Unable to get log information for job: "
-              + oldJobID);
-        }
-        AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
-        return new LogParams(
-            amInfo.getContainerId().toString(),
-            amInfo.getAppAttemptId().getApplicationId().toString(),
-            BuilderUtils.newNodeId(amInfo.getNodeManagerHost(),
-                amInfo.getNodeManagerPort()).toString(), report.getUser());
-      }
-    } else {
-      throw new IOException("Cannot get log path for a in-progress job");
-    }
+    // FIXME logs for an attempt?
+    throw new UnsupportedOperationException();
   }
 }

Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java?rev=1470743&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java Mon Apr 22 22:48:21 2013
@@ -0,0 +1,354 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+public class DAGJobStatus extends JobStatus {
+
+  private final ApplicationReport report;
+  
+  public DAGJobStatus(ApplicationReport appReport) {
+    super();
+    this.report = appReport;
+  }
+  
+  @Override
+  protected synchronized void setMapProgress(float p) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setCleanupProgress(float p) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setSetupProgress(float p) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setReduceProgress(float p) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setPriority(JobPriority jp) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setFinishTime(long finishTime) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setHistoryFile(String historyFile) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setTrackingUrl(String trackingUrl) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setRetired() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setState(State state) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setStartTime(long startTime) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setUsername(String userName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setSchedulingInfo(String schedulingInfo) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setQueue(String queue) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setFailureInfo(String failureInfo) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized String getQueue() {
+    return report.getQueue();
+  }
+
+  @Override
+  public synchronized float getMapProgress() {
+    if (report.getYarnApplicationState().equals(
+        YarnApplicationState.FINISHED)
+        && report.getFinalApplicationStatus().equals(
+            FinalApplicationStatus.SUCCEEDED)) {
+      return 1.0f;
+    }
+    return 0.0f;
+  }
+
+  @Override
+  public synchronized float getCleanupProgress() {
+    if (report.getYarnApplicationState().equals(
+        YarnApplicationState.FINISHED)) {
+      return 1.0f;
+    }
+    return 0.0f;
+  }
+
+  @Override
+  public synchronized float getSetupProgress() {
+    if (report.getYarnApplicationState().equals(
+        YarnApplicationState.RUNNING)
+        && report.getFinalApplicationStatus().equals(
+            FinalApplicationStatus.UNDEFINED)) {
+      return 1.0f;
+    }
+    return 0.0f;
+  }
+
+  @Override
+  public synchronized float getReduceProgress() {
+    if (report.getYarnApplicationState().equals(
+        YarnApplicationState.FINISHED)
+        && report.getFinalApplicationStatus().equals(
+            FinalApplicationStatus.SUCCEEDED)) {
+      return 1.0f;
+    }
+    return 0.0f;
+  }
+
+  @Override
+  public synchronized State getState() {
+    return TypeConverter.fromYarn(report.getYarnApplicationState(),
+        report.getFinalApplicationStatus());
+  }
+
+  @Override
+  public synchronized long getStartTime() {
+    return report.getStartTime();
+  }
+
+  @Override
+  public JobID getJobID() {
+    return TypeConverter.fromYarn(report.getApplicationId());
+  }
+
+  @Override
+  public synchronized String getUsername() {
+    return report.getUser();
+  }
+
+  @Override
+  public synchronized String getSchedulingInfo() {
+    return report.getTrackingUrl();
+  }
+
+  @Override
+  public synchronized Map<JobACL, AccessControlList> getJobACLs() {
+    // TODO Auto-generated method stub
+    return super.getJobACLs();
+  }
+
+  @Override
+  public synchronized JobPriority getPriority() {
+    // TODO Auto-generated method stub
+    return super.getPriority();
+  }
+
+  @Override
+  public synchronized String getFailureInfo() {
+    return report.getDiagnostics();
+  }
+
+  @Override
+  public synchronized boolean isJobComplete() {
+    return (report.getYarnApplicationState().equals(
+        YarnApplicationState.FINISHED)
+        || report.getYarnApplicationState().equals(
+            YarnApplicationState.FAILED)
+        || report.getYarnApplicationState().equals(
+            YarnApplicationState.KILLED));
+  }
+
+  @Override
+  public synchronized void write(DataOutput out) throws IOException {
+    // FIXME
+  }
+
+  @Override
+  public synchronized void readFields(DataInput in) throws IOException {
+    // FIXME
+  }
+
+  @Override
+  public String getJobName() {
+    return report.getName();
+  }
+
+  @Override
+  public String getJobFile() {
+    // FIXME
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized String getTrackingUrl() {
+    return report.getTrackingUrl();
+  }
+
+  @Override
+  public synchronized long getFinishTime() {
+    return report.getFinishTime();
+  }
+
+  @Override
+  public synchronized boolean isRetired() {
+    // FIXME handle retired jobs?
+    return false;
+  }
+
+  @Override
+  public synchronized String getHistoryFile() {
+    // FIXME handle history in status
+    return null;
+  }
+
+  @Override
+  public int getNumUsedSlots() {
+    return report.getApplicationResourceUsageReport().getNumUsedContainers();
+  }
+
+  @Override
+  public void setNumUsedSlots(int n) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getNumReservedSlots() {
+    return report.getApplicationResourceUsageReport().
+        getNumReservedContainers();
+  }
+
+  @Override
+  public void setNumReservedSlots(int n) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getUsedMem() {
+    return report.getApplicationResourceUsageReport().
+        getUsedResources().getMemory();
+  }
+
+  @Override
+  public void setUsedMem(int m) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getReservedMem() {
+    return report.getApplicationResourceUsageReport().
+        getReservedResources().getMemory();
+  }
+
+  @Override
+  public void setReservedMem(int r) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getNeededMem() {
+    return report.getApplicationResourceUsageReport().
+        getNeededResources().getMemory();
+  }
+
+  @Override
+  public void setNeededMem(int n) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized boolean isUber() {
+    return false;
+  }
+
+  @Override
+  public synchronized void setUber(boolean isUber) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer buffer = new StringBuffer();
+    buffer.append("job-id : " + getJobID());
+    buffer.append("uber-mode : " + isUber());
+    buffer.append("map-progress : " + getMapProgress());
+    buffer.append("reduce-progress : " + getReduceProgress());
+    buffer.append("cleanup-progress : " + getCleanupProgress());
+    buffer.append("setup-progress : " + getSetupProgress());
+    buffer.append("runstate : " + getState());
+    buffer.append("start-time : " + getStartTime());
+    buffer.append("user-name : " + getUsername());
+    buffer.append("priority : " + getPriority());
+    buffer.append("scheduling-info : " + getSchedulingInfo());
+    buffer.append("num-used-slots" + getNumUsedSlots());
+    buffer.append("num-reserved-slots" + getNumReservedSlots());
+    buffer.append("used-mem" + getUsedMem());
+    buffer.append("reserved-mem" + getReservedMem());
+    buffer.append("needed-mem" + getNeededMem());
+    return buffer.toString();
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1470743&r1=1470742&r2=1470743&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Mon Apr 22 22:48:21 2013
@@ -864,34 +864,36 @@ public class YARNRunner implements Clien
         diagnostics);
     }
 
-    while (true) {
-      appMasterReport = resMgrDelegate
-          .getApplicationReport(applicationId);
-      diagnostics =
-          (appMasterReport == null ?
-              "application report is null" : appMasterReport.getDiagnostics());
-      if (appMasterReport == null) {
-        throw new IOException("Failed to run job : " +
-          diagnostics);
-      }
-      YarnApplicationState state = appMasterReport.getYarnApplicationState();
-      if (state.equals(YarnApplicationState.FAILED)
-          || state.equals(YarnApplicationState.FINISHED)
-          || state.equals(YarnApplicationState.KILLED)) {
-        LOG.info("Job completed"
-            + ", finalStatus=" + appMasterReport.getFinalApplicationStatus()
-            + ", finalState=" + appMasterReport.getYarnApplicationState()
-            + ", diagnostics=" + diagnostics);
-        break;
-      } else {
-        LOG.info("Job in progress"
-            + ", finalStatus=" + appMasterReport.getFinalApplicationStatus()
-            + ", finalState=" + appMasterReport.getYarnApplicationState()
-            + ", diagnostics=" + diagnostics);
-      }
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
+    if (LOG.isDebugEnabled()) {
+      while (true) {
+        appMasterReport = resMgrDelegate
+            .getApplicationReport(applicationId);
+        diagnostics =
+            (appMasterReport == null ?
+                "application report is null"
+                : appMasterReport.getDiagnostics());
+        if (appMasterReport == null) {
+          throw new IOException("Failed to run job : " +
+            diagnostics);
+        }
+        YarnApplicationState state = appMasterReport.getYarnApplicationState();
+        if (state.equals(YarnApplicationState.FAILED)
+            || state.equals(YarnApplicationState.FINISHED)
+            || state.equals(YarnApplicationState.KILLED)) {
+          LOG.info("Job completed"
+              + ", finalStatus=" + appMasterReport.getFinalApplicationStatus()
+              + ", finalState=" + appMasterReport.getYarnApplicationState()
+              + ", diagnostics=" + diagnostics);
+          break;
+        } else {
+          LOG.info("Job in progress"
+              + ", finalStatus=" + appMasterReport.getFinalApplicationStatus()
+              + ", finalState=" + appMasterReport.getYarnApplicationState());
+        }
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
       }
     }