You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/23 00:48:21 UTC
svn commit: r1470743 - in
/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce:
ClientCache.java ClientServiceDelegate.java DAGJobStatus.java YARNRunner.java
Author: hitesh
Date: Mon Apr 22 22:48:21 2013
New Revision: 1470743
URL: http://svn.apache.org/r1470743
Log:
YARN-52. Fix YARNRunner to have jobStatus so that it does not throw exception after job success. (hitesh)
Added:
incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java (with props)
Modified:
incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java?rev=1470743&r1=1470742&r2=1470743&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java Mon Apr 22 22:48:21 2013
@@ -18,36 +18,20 @@
package org.apache.tez.mapreduce;
-import java.io.IOException;
-import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
public class ClientCache {
private final Configuration conf;
private final ResourceMgrDelegate rm;
- private static final Log LOG = LogFactory.getLog(ClientCache.class);
-
private Map<JobID, ClientServiceDelegate> cache =
new HashMap<JobID, ClientServiceDelegate>();
- private MRClientProtocol hsProxy;
-
public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
this.conf = conf;
this.rm = rm;
@@ -55,46 +39,12 @@ public class ClientCache {
//TODO: evict from the cache on some threshold
public synchronized ClientServiceDelegate getClient(JobID jobId) {
- if (hsProxy == null) {
- try {
- hsProxy = instantiateHistoryProxy();
- } catch (IOException e) {
- LOG.warn("Could not connect to History server.", e);
- throw new YarnException("Could not connect to History server.", e);
- }
- }
ClientServiceDelegate client = cache.get(jobId);
if (client == null) {
- client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
+ client = new ClientServiceDelegate(conf, rm, jobId);
cache.put(jobId, client);
}
return client;
}
- protected synchronized MRClientProtocol getInitializedHSProxy()
- throws IOException {
- if (this.hsProxy == null) {
- hsProxy = instantiateHistoryProxy();
- }
- return this.hsProxy;
- }
-
- protected MRClientProtocol instantiateHistoryProxy()
- throws IOException {
- final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
- if (StringUtils.isEmpty(serviceAddr)) {
- return null;
- }
- LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
- final YarnRPC rpc = YarnRPC.create(conf);
- LOG.debug("Connected to HistoryServer at: " + serviceAddr);
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
- @Override
- public MRClientProtocol run() {
- return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
- NetUtils.createSocketAddr(serviceAddr), conf);
- }
- });
- }
}
Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java?rev=1470743&r1=1470742&r2=1470743&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java Mon Apr 22 22:48:21 2013
@@ -18,68 +18,27 @@
package org.apache.tez.mapreduce;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.*;
-import org.apache.hadoop.mapreduce.v2.api.records.*;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ClientToken;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
public class ClientServiceDelegate {
- private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
- private static final String UNAVAILABLE = "N/A";
-
- // Caches for per-user NotRunningJobs
- private HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs;
private final Configuration conf;
- private final JobID jobId;
- private final ApplicationId appId;
private final ResourceMgrDelegate rm;
- private final MRClientProtocol historyServerProxy;
- private MRClientProtocol realProxy = null;
- private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- private static String UNKNOWN_USER = "Unknown User";
- private String trackingUrl;
- private boolean amAclDisabledStatusLogged = false;
+ // FIXME
+ // how to handle completed jobs that the RM does not know about?
public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
- JobID jobId, MRClientProtocol historyServerProxy) {
+ JobID jobId) {
this.conf = new Configuration(conf); // Cloning for modifying.
// For faster redirects from AM to HS.
this.conf.setInt(
@@ -87,387 +46,68 @@ public class ClientServiceDelegate {
this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
this.rm = rm;
- this.jobId = jobId;
- this.historyServerProxy = historyServerProxy;
- this.appId = TypeConverter.toYarn(jobId).getAppId();
- notRunningJobs = new HashMap<JobState, HashMap<String, NotRunningJob>>();
- }
-
- // Get the instance of the NotRunningJob corresponding to the specified
- // user and state
- private NotRunningJob getNotRunningJob(ApplicationReport applicationReport,
- JobState state) {
- synchronized (notRunningJobs) {
- HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
- if (map == null) {
- map = new HashMap<String, NotRunningJob>();
- notRunningJobs.put(state, map);
- }
- String user =
- (applicationReport == null) ?
- UNKNOWN_USER : applicationReport.getUser();
- NotRunningJob notRunningJob = map.get(user);
- if (notRunningJob == null) {
- notRunningJob = new NotRunningJob(applicationReport, state);
- map.put(user, notRunningJob);
- }
- return notRunningJob;
- }
- }
-
- private MRClientProtocol getProxy() throws YarnRemoteException {
- if (realProxy != null) {
- return realProxy;
- }
-
- // Possibly allow nulls through the PB tunnel, otherwise deal with an exception
- // and redirect to the history server.
- ApplicationReport application = rm.getApplicationReport(appId);
- if (application != null) {
- trackingUrl = application.getTrackingUrl();
- }
- InetSocketAddress serviceAddr = null;
- while (application == null
- || YarnApplicationState.RUNNING == application
- .getYarnApplicationState()) {
- if (application == null) {
- LOG.info("Could not get Job info from RM for job " + jobId
- + ". Redirecting to job history server.");
- return checkAndGetHSProxy(null, JobState.NEW);
- }
- try {
- if (application.getHost() == null || "".equals(application.getHost())) {
- LOG.debug("AM not assigned to Job. Waiting to get the AM ...");
- Thread.sleep(2000);
-
- LOG.debug("Application state is " + application.getYarnApplicationState());
- application = rm.getApplicationReport(appId);
- continue;
- } else if (UNAVAILABLE.equals(application.getHost())) {
- if (!amAclDisabledStatusLogged) {
- LOG.info("Job " + jobId + " is running, but the host is unknown."
- + " Verify user has VIEW_JOB access.");
- amAclDisabledStatusLogged = true;
- }
- return getNotRunningJob(application, JobState.RUNNING);
- }
- if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
- UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
- UserGroupInformation.getCurrentUser().getUserName());
- serviceAddr = NetUtils.createSocketAddrForHost(
- application.getHost(), application.getRpcPort());
- if (UserGroupInformation.isSecurityEnabled()) {
- ClientToken clientToken = application.getClientToken();
- Token<ClientTokenIdentifier> token =
- ProtoUtils.convertFromProtoFormat(clientToken, serviceAddr);
- newUgi.addToken(token);
- }
- LOG.debug("Connecting to " + serviceAddr);
- final InetSocketAddress finalServiceAddr = serviceAddr;
- realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
- @Override
- public MRClientProtocol run() throws IOException {
- return instantiateAMProxy(finalServiceAddr);
- }
- });
- } else {
- if (!amAclDisabledStatusLogged) {
- LOG.info("Network ACL closed to AM for job " + jobId
- + ". Not going to try to reach the AM.");
- amAclDisabledStatusLogged = true;
- }
- return getNotRunningJob(null, JobState.RUNNING);
- }
- return realProxy;
- } catch (IOException e) {
- //possibly the AM has crashed
- //there may be some time before AM is restarted
- //keep retrying by getting the address from RM
- LOG.info("Could not connect to " + serviceAddr +
- ". Waiting for getting the latest AM address...");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e1) {
- LOG.warn("getProxy() call interruped", e1);
- throw new YarnException(e1);
- }
- application = rm.getApplicationReport(appId);
- if (application == null) {
- LOG.info("Could not get Job info from RM for job " + jobId
- + ". Redirecting to job history server.");
- return checkAndGetHSProxy(null, JobState.RUNNING);
- }
- } catch (InterruptedException e) {
- LOG.warn("getProxy() call interruped", e);
- throw new YarnException(e);
- }
- }
-
- /** we just want to return if its allocating, so that we don't
- * block on it. This is to be able to return job status
- * on an allocating Application.
- */
- String user = application.getUser();
- if (user == null) {
- throw RPCUtil.getRemoteException("User is not set in the application report");
- }
- if (application.getYarnApplicationState() == YarnApplicationState.NEW
- || application.getYarnApplicationState() == YarnApplicationState.SUBMITTED
- || application.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
- realProxy = null;
- return getNotRunningJob(application, JobState.NEW);
- }
-
- if (application.getYarnApplicationState() == YarnApplicationState.FAILED) {
- realProxy = null;
- return getNotRunningJob(application, JobState.FAILED);
- }
-
- if (application.getYarnApplicationState() == YarnApplicationState.KILLED) {
- realProxy = null;
- return getNotRunningJob(application, JobState.KILLED);
- }
-
- //History server can serve a job only if application
- //succeeded.
- if (application.getYarnApplicationState() == YarnApplicationState.FINISHED) {
- LOG.info("Application state is completed. FinalApplicationStatus="
- + application.getFinalApplicationStatus().toString()
- + ". Redirecting to job history server");
- realProxy = checkAndGetHSProxy(application, JobState.SUCCEEDED);
- }
- return realProxy;
- }
-
- private MRClientProtocol checkAndGetHSProxy(
- ApplicationReport applicationReport, JobState state) {
- if (null == historyServerProxy) {
- LOG.warn("Job History Server is not configured.");
- return getNotRunningJob(applicationReport, state);
- }
- return historyServerProxy;
- }
-
- MRClientProtocol instantiateAMProxy(final InetSocketAddress serviceAddr)
- throws IOException {
- LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
- YarnRPC rpc = YarnRPC.create(conf);
- MRClientProtocol proxy =
- (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
- serviceAddr, conf);
- LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
- return proxy;
}
- private synchronized Object invoke(String method, Class argClass,
- Object args) throws IOException {
- Method methodOb = null;
- try {
- methodOb = MRClientProtocol.class.getMethod(method, argClass);
- } catch (SecurityException e) {
- throw new YarnException(e);
- } catch (NoSuchMethodException e) {
- throw new YarnException("Method name mismatch", e);
- }
- int maxRetries = this.conf.getInt(
- MRJobConfig.MR_CLIENT_MAX_RETRIES,
- MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
- IOException lastException = null;
- while (maxRetries > 0) {
- try {
- return methodOb.invoke(getProxy(), args);
- } catch (YarnRemoteException yre) {
- LOG.warn("Exception thrown by remote end.", yre);
- throw yre;
- } catch (InvocationTargetException e) {
- if (e.getTargetException() instanceof YarnRemoteException) {
- LOG.warn("Error from remote end: " + e
- .getTargetException().getLocalizedMessage());
- LOG.debug("Tracing remote error ", e.getTargetException());
- throw (YarnRemoteException) e.getTargetException();
- }
- LOG.debug("Failed to contact AM/History for job " + jobId +
- " retrying..", e.getTargetException());
- // Force reconnection by setting the proxy to null.
- realProxy = null;
- // HS/AMS shut down
- maxRetries--;
- lastException = new IOException(e.getMessage());
-
- } catch (Exception e) {
- LOG.debug("Failed to contact AM/History for job " + jobId
- + " Will retry..", e);
- // Force reconnection by setting the proxy to null.
- realProxy = null;
- // RM shutdown
- maxRetries--;
- lastException = new IOException(e.getMessage());
- }
- }
- throw lastException;
- }
-
- public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
- InterruptedException {
- org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
- GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
- request.setJobId(jobID);
- Counters cnt = ((GetCountersResponse)
- invoke("getCounters", GetCountersRequest.class, request)).getCounters();
- return TypeConverter.fromYarn(cnt);
-
+ public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID jobId)
+ throws IOException, InterruptedException {
+ // FIXME needs counters support from DAG
+ // with a translation layer on client side
+ org.apache.hadoop.mapreduce.Counters empty =
+ new org.apache.hadoop.mapreduce.Counters();
+ return empty;
}
- public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobId,
+ int fromEventId, int maxEvents)
throws IOException, InterruptedException {
- org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
- .toYarn(arg0);
- GetTaskAttemptCompletionEventsRequest request = recordFactory
- .newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
- request.setJobId(jobID);
- request.setFromEventId(arg1);
- request.setMaxEvents(arg2);
- List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list =
- ((GetTaskAttemptCompletionEventsResponse) invoke(
- "getTaskAttemptCompletionEvents", GetTaskAttemptCompletionEventsRequest.class, request)).
- getCompletionEventList();
- return TypeConverter
- .fromYarn(list
- .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
+ // FIXME seems like there is support in client to query task failure
+ // related information
+ // However, api does not make sense for DAG
+ return new TaskCompletionEvent[0];
}
- public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
+ public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
+ taId)
throws IOException, InterruptedException {
-
- org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
- .toYarn(arg0);
- GetDiagnosticsRequest request = recordFactory
- .newRecordInstance(GetDiagnosticsRequest.class);
- request.setTaskAttemptId(attemptID);
- List<String> list = ((GetDiagnosticsResponse) invoke("getDiagnostics",
- GetDiagnosticsRequest.class, request)).getDiagnosticsList();
- String[] result = new String[list.size()];
- int i = 0;
- for (String c : list) {
- result[i++] = c.toString();
- }
- return result;
+ // FIXME need support to query task diagnostics?
+ return new String[0];
}
public JobStatus getJobStatus(JobID oldJobID) throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
- GetJobReportRequest request =
- recordFactory.newRecordInstance(GetJobReportRequest.class);
- request.setJobId(jobId);
- JobReport report = ((GetJobReportResponse) invoke("getJobReport",
- GetJobReportRequest.class, request)).getJobReport();
- JobStatus jobStatus = null;
- if (report != null) {
- if (StringUtils.isEmpty(report.getJobFile())) {
- String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
- report.setJobFile(jobFile);
- }
- String historyTrackingUrl = report.getTrackingUrl();
- String url = StringUtils.isNotEmpty(historyTrackingUrl)
- ? historyTrackingUrl : trackingUrl;
- if (!UNAVAILABLE.equals(url)) {
- url = HttpConfig.getSchemePrefix() + url;
- }
- jobStatus = TypeConverter.fromYarn(report, url);
- }
+ ApplicationReport appReport =
+ rm.getApplicationReport(jobId.getAppId());
+ JobStatus jobStatus =
+ new DAGJobStatus(appReport);
return jobStatus;
}
- public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
+ public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
+ JobID oldJobID, TaskType taskType)
throws IOException{
- org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
- TypeConverter.toYarn(oldJobID);
- GetTaskReportsRequest request =
- recordFactory.newRecordInstance(GetTaskReportsRequest.class);
- request.setJobId(jobId);
- request.setTaskType(TypeConverter.toYarn(taskType));
-
- List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports =
- ((GetTaskReportsResponse) invoke("getTaskReports", GetTaskReportsRequest.class,
- request)).getTaskReportList();
-
- return TypeConverter.fromYarn
- (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
+ // FIXME need support to query task reports?
+ throw new UnsupportedOperationException();
}
public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
throws IOException {
- org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
- = TypeConverter.toYarn(taskAttemptID);
- if (fail) {
- FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
- failRequest.setTaskAttemptId(attemptID);
- invoke("failTaskAttempt", FailTaskAttemptRequest.class, failRequest);
- } else {
- KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
- killRequest.setTaskAttemptId(attemptID);
- invoke("killTaskAttempt", KillTaskAttemptRequest.class, killRequest);
- }
- return true;
+ // FIXME need support to kill a task attempt?
+ throw new UnsupportedOperationException();
}
public boolean killJob(JobID oldJobID)
throws IOException {
- org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
- = TypeConverter.toYarn(oldJobID);
- KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
- killRequest.setJobId(jobId);
- invoke("killJob", KillJobRequest.class, killRequest);
- return true;
+ // FIXME need support to kill a dag?
+ // Should this be just an RM killApplication?
+ // For one dag per AM, RM kill should suffice
+ throw new UnsupportedOperationException();
}
- public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
+ public LogParams getLogFilePath(JobID oldJobID,
+ TaskAttemptID oldTaskAttemptID)
throws YarnRemoteException, IOException {
- org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
- TypeConverter.toYarn(oldJobID);
- GetJobReportRequest request =
- recordFactory.newRecordInstance(GetJobReportRequest.class);
- request.setJobId(jobId);
-
- JobReport report =
- ((GetJobReportResponse) invoke("getJobReport",
- GetJobReportRequest.class, request)).getJobReport();
- if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
- JobState.ERROR).contains(report.getJobState())) {
- if (oldTaskAttemptID != null) {
- GetTaskAttemptReportRequest taRequest =
- recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
- taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
- TaskAttemptReport taReport =
- ((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
- GetTaskAttemptReportRequest.class, taRequest))
- .getTaskAttemptReport();
- if (taReport.getContainerId() == null
- || taReport.getNodeManagerHost() == null) {
- throw new IOException("Unable to get log information for task: "
- + oldTaskAttemptID);
- }
- return new LogParams(
- taReport.getContainerId().toString(),
- taReport.getContainerId().getApplicationAttemptId()
- .getApplicationId().toString(),
- BuilderUtils.newNodeId(taReport.getNodeManagerHost(),
- taReport.getNodeManagerPort()).toString(), report.getUser());
- } else {
- if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
- throw new IOException("Unable to get log information for job: "
- + oldJobID);
- }
- AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
- return new LogParams(
- amInfo.getContainerId().toString(),
- amInfo.getAppAttemptId().getApplicationId().toString(),
- BuilderUtils.newNodeId(amInfo.getNodeManagerHost(),
- amInfo.getNodeManagerPort()).toString(), report.getUser());
- }
- } else {
- throw new IOException("Cannot get log path for a in-progress job");
- }
+ // FIXME logs for an attempt?
+ throw new UnsupportedOperationException();
}
}
Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java?rev=1470743&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java Mon Apr 22 22:48:21 2013
@@ -0,0 +1,354 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+public class DAGJobStatus extends JobStatus {
+
+ private final ApplicationReport report;
+
+ public DAGJobStatus(ApplicationReport appReport) {
+ super();
+ this.report = appReport;
+ }
+
+ @Override
+ protected synchronized void setMapProgress(float p) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setCleanupProgress(float p) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setSetupProgress(float p) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setReduceProgress(float p) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setPriority(JobPriority jp) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setFinishTime(long finishTime) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setHistoryFile(String historyFile) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setTrackingUrl(String trackingUrl) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setRetired() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setState(State state) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setStartTime(long startTime) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setUsername(String userName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setSchedulingInfo(String schedulingInfo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setQueue(String queue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setFailureInfo(String failureInfo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized String getQueue() {
+ return report.getQueue();
+ }
+
+ @Override
+ public synchronized float getMapProgress() {
+ if (report.getYarnApplicationState().equals(
+ YarnApplicationState.FINISHED)
+ && report.getFinalApplicationStatus().equals(
+ FinalApplicationStatus.SUCCEEDED)) {
+ return 1.0f;
+ }
+ return 0.0f;
+ }
+
+ @Override
+ public synchronized float getCleanupProgress() {
+ if (report.getYarnApplicationState().equals(
+ YarnApplicationState.FINISHED)) {
+ return 1.0f;
+ }
+ return 0.0f;
+ }
+
+ @Override
+ public synchronized float getSetupProgress() {
+ if (report.getYarnApplicationState().equals(
+ YarnApplicationState.RUNNING)
+ && report.getFinalApplicationStatus().equals(
+ FinalApplicationStatus.UNDEFINED)) {
+ return 1.0f;
+ }
+ return 0.0f;
+ }
+
+ @Override
+ public synchronized float getReduceProgress() {
+ if (report.getYarnApplicationState().equals(
+ YarnApplicationState.FINISHED)
+ && report.getFinalApplicationStatus().equals(
+ FinalApplicationStatus.SUCCEEDED)) {
+ return 1.0f;
+ }
+ return 0.0f;
+ }
+
+ @Override
+ public synchronized State getState() {
+ return TypeConverter.fromYarn(report.getYarnApplicationState(),
+ report.getFinalApplicationStatus());
+ }
+
+ @Override
+ public synchronized long getStartTime() {
+ return report.getStartTime();
+ }
+
+ @Override
+ public JobID getJobID() {
+ return TypeConverter.fromYarn(report.getApplicationId());
+ }
+
+ @Override
+ public synchronized String getUsername() {
+ return report.getUser();
+ }
+
+ @Override
+ public synchronized String getSchedulingInfo() {
+ return report.getTrackingUrl();
+ }
+
+ @Override
+ public synchronized Map<JobACL, AccessControlList> getJobACLs() {
+ // TODO Auto-generated method stub
+ return super.getJobACLs();
+ }
+
+ @Override
+ public synchronized JobPriority getPriority() {
+ // TODO Auto-generated method stub
+ return super.getPriority();
+ }
+
+ @Override
+ public synchronized String getFailureInfo() {
+ return report.getDiagnostics();
+ }
+
+ @Override
+ public synchronized boolean isJobComplete() {
+ return (report.getYarnApplicationState().equals(
+ YarnApplicationState.FINISHED)
+ || report.getYarnApplicationState().equals(
+ YarnApplicationState.FAILED)
+ || report.getYarnApplicationState().equals(
+ YarnApplicationState.KILLED));
+ }
+
+ @Override
+ public synchronized void write(DataOutput out) throws IOException {
+ // FIXME
+ }
+
+ @Override
+ public synchronized void readFields(DataInput in) throws IOException {
+ // FIXME
+ }
+
+ @Override
+ public String getJobName() {
+ return report.getName();
+ }
+
+ @Override
+ public String getJobFile() {
+ // FIXME
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized String getTrackingUrl() {
+ return report.getTrackingUrl();
+ }
+
+ @Override
+ public synchronized long getFinishTime() {
+ return report.getFinishTime();
+ }
+
+ @Override
+ public synchronized boolean isRetired() {
+ // FIXME handle retired jobs?
+ return false;
+ }
+
+ @Override
+ public synchronized String getHistoryFile() {
+ // FIXME handle history in status
+ return null;
+ }
+
+ @Override
+ public int getNumUsedSlots() {
+ return report.getApplicationResourceUsageReport().getNumUsedContainers();
+ }
+
+ @Override
+ public void setNumUsedSlots(int n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumReservedSlots() {
+ return report.getApplicationResourceUsageReport().
+ getNumReservedContainers();
+ }
+
+ @Override
+ public void setNumReservedSlots(int n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getUsedMem() {
+ return report.getApplicationResourceUsageReport().
+ getUsedResources().getMemory();
+ }
+
+ @Override
+ public void setUsedMem(int m) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getReservedMem() {
+ return report.getApplicationResourceUsageReport().
+ getReservedResources().getMemory();
+ }
+
+ @Override
+ public void setReservedMem(int r) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNeededMem() {
+ return report.getApplicationResourceUsageReport().
+ getNeededResources().getMemory();
+ }
+
+ @Override
+ public void setNeededMem(int n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized boolean isUber() {
+ return false;
+ }
+
+ @Override
+ public synchronized void setUber(boolean isUber) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("job-id : " + getJobID());
+ buffer.append("uber-mode : " + isUber());
+ buffer.append("map-progress : " + getMapProgress());
+ buffer.append("reduce-progress : " + getReduceProgress());
+ buffer.append("cleanup-progress : " + getCleanupProgress());
+ buffer.append("setup-progress : " + getSetupProgress());
+ buffer.append("runstate : " + getState());
+ buffer.append("start-time : " + getStartTime());
+ buffer.append("user-name : " + getUsername());
+ buffer.append("priority : " + getPriority());
+ buffer.append("scheduling-info : " + getSchedulingInfo());
+ buffer.append("num-used-slots" + getNumUsedSlots());
+ buffer.append("num-reserved-slots" + getNumReservedSlots());
+ buffer.append("used-mem" + getUsedMem());
+ buffer.append("reserved-mem" + getReservedMem());
+ buffer.append("needed-mem" + getNeededMem());
+ return buffer.toString();
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1470743&r1=1470742&r2=1470743&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Mon Apr 22 22:48:21 2013
@@ -864,34 +864,36 @@ public class YARNRunner implements Clien
diagnostics);
}
- while (true) {
- appMasterReport = resMgrDelegate
- .getApplicationReport(applicationId);
- diagnostics =
- (appMasterReport == null ?
- "application report is null" : appMasterReport.getDiagnostics());
- if (appMasterReport == null) {
- throw new IOException("Failed to run job : " +
- diagnostics);
- }
- YarnApplicationState state = appMasterReport.getYarnApplicationState();
- if (state.equals(YarnApplicationState.FAILED)
- || state.equals(YarnApplicationState.FINISHED)
- || state.equals(YarnApplicationState.KILLED)) {
- LOG.info("Job completed"
- + ", finalStatus=" + appMasterReport.getFinalApplicationStatus()
- + ", finalState=" + appMasterReport.getYarnApplicationState()
- + ", diagnostics=" + diagnostics);
- break;
- } else {
- LOG.info("Job in progress"
- + ", finalStatus=" + appMasterReport.getFinalApplicationStatus()
- + ", finalState=" + appMasterReport.getYarnApplicationState()
- + ", diagnostics=" + diagnostics);
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ while (true) {
+ appMasterReport = resMgrDelegate
+ .getApplicationReport(applicationId);
+ diagnostics =
+ (appMasterReport == null ?
+ "application report is null"
+ : appMasterReport.getDiagnostics());
+ if (appMasterReport == null) {
+ throw new IOException("Failed to run job : " +
+ diagnostics);
+ }
+ YarnApplicationState state = appMasterReport.getYarnApplicationState();
+ if (state.equals(YarnApplicationState.FAILED)
+ || state.equals(YarnApplicationState.FINISHED)
+ || state.equals(YarnApplicationState.KILLED)) {
+ LOG.info("Job completed"
+ + ", finalStatus=" + appMasterReport.getFinalApplicationStatus()
+ + ", finalState=" + appMasterReport.getYarnApplicationState()
+ + ", diagnostics=" + diagnostics);
+ break;
+ } else {
+ LOG.info("Job in progress"
+ + ", finalStatus=" + appMasterReport.getFinalApplicationStatus()
+ + ", finalState=" + appMasterReport.getYarnApplicationState());
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
}
}