You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [15/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233:
./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java Sat Nov 28 20:26:01 2009
@@ -33,6 +33,7 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.MRConfig;
/**
* IsolationRunner is intended to facilitate debugging by re-running a specific
@@ -40,7 +41,7 @@
* Currently, it is limited to re-running map tasks.
*
* Users may coerce MapReduce to keep task files around by setting
- * keep.failed.task.files. See mapred_tutorial.xml for more documentation.
+ * mapreduce.task.files.preserve.failedtasks. See mapred_tutorial.xml for more documentation.
*/
public class IsolationRunner {
private static final Log LOG =
@@ -64,6 +65,10 @@
LOG.info("Task " + taskId + " reporting shuffle error: " + message);
}
+ public void fatalError(TaskAttemptID taskId, String msg) throws IOException {
+ LOG.info("Task " + taskId + " reporting fatal error: " + msg);
+ }
+
public JvmTask getTask(JvmContext context) throws IOException {
return null;
}
@@ -149,21 +154,21 @@
return false;
}
JobConf conf = new JobConf(new Path(jobFilename.toString()));
- TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
+ TaskAttemptID taskId = TaskAttemptID.forName(conf.get(JobContext.TASK_ATTEMPT_ID));
if (taskId == null) {
- System.out.println("mapred.task.id not found in configuration;" +
+ System.out.println("mapreduce.task.attempt.id not found in configuration;" +
" job.xml is not a task config");
}
- boolean isMap = conf.getBoolean("mapred.task.is.map", true);
+ boolean isMap = conf.getBoolean(JobContext.TASK_ISMAP, true);
if (!isMap) {
System.out.println("Only map tasks are supported.");
return false;
}
- int partition = conf.getInt("mapred.task.partition", 0);
+ int partition = conf.getInt(JobContext.TASK_PARTITION, 0);
// setup the local and user working directories
FileSystem local = FileSystem.getLocal(conf);
- LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
local.setWorkingDirectory(new Path(workDirName.toString()));
@@ -179,9 +184,9 @@
// any of the configured local disks, so use LocalDirAllocator to find out
// where it is.
Path localSplit =
- new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
- TaskTracker.getLocalSplitFile(taskId.getJobID().toString(), taskId
- .toString()), conf);
+ new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
+ TaskTracker.getLocalSplitFile(conf.getUser(), taskId.getJobID()
+ .toString(), taskId.toString()), conf);
DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
String splitClass = Text.readString(splitFile);
BytesWritable split = new BytesWritable();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JSPUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JSPUtil.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JSPUtil.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JSPUtil.java Sat Nov 28 20:26:01 2009
@@ -27,12 +27,15 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import javax.servlet.jsp.JspWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobHistory.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.ServletUtil;
import org.apache.hadoop.util.StringUtils;
@@ -46,7 +49,7 @@
new LinkedHashMap<String, JobInfo>();
private static final int CACHE_SIZE =
- conf.getInt("mapred.job.tracker.jobhistory.lru.cache.size", 5);
+ conf.getInt(JTConfig.JT_JOBHISTORY_CACHE_SIZE, 5);
private static final Log LOG = LogFactory.getLog(JSPUtil.class);
/**
@@ -257,6 +260,43 @@
return sb.toString();
}
+ @SuppressWarnings("unchecked")
+ public static void generateRetiredJobXml(JspWriter out, JobTracker tracker, int rowId)
+ throws IOException {
+
+ Iterator<JobStatus> iterator =
+ tracker.retireJobs.getAll().descendingIterator();
+
+ for (int i = 0; i < 100 && iterator.hasNext(); i++) {
+ JobStatus status = iterator.next();
+ StringBuilder sb = new StringBuilder();
+ sb.append("<retired_job rowid=\"" + rowId + "\" jobid=\"" + status.getJobId() + "\">");
+ sb.append("<jobid>" + status.getJobId() + "</jobid>");
+ sb.append("<history_url>jobdetailshistory.jsp?jobid=" + status.getJobId()
+ + "&logFile="
+ + URLEncoder.encode(status.getHistoryFile().toString(), "UTF-8")
+ + "</history_url>");
+ sb.append("<priority>" + status.getJobPriority().toString()
+ + "</priority>");
+ sb.append("<user>" + status.getUsername() + "</user>");
+ sb.append("<name>" + status.getJobName() + "</name>");
+ sb.append("<run_state>" + JobStatus.getJobRunState(status.getRunState())
+ + "</run_state>");
+ sb.append("<start_time>" + new Date(status.getStartTime())
+ + "</start_time>");
+ sb.append("<finish_time>" + new Date(status.getFinishTime())
+ + "</finish_time>");
+ sb.append("<map_complete>" + StringUtils.formatPercent(
+ status.mapProgress(), 2) + "</map_complete>");
+ sb.append("<reduce_complete>" + StringUtils.formatPercent(
+ status.reduceProgress(), 2) + "</reduce_complete>");
+ sb.append("<scheduling_info>" + status.getSchedulingInfo() + "</scheduling_info>");
+ sb.append("</retired_job>\n");
+ out.write(sb.toString());
+ rowId++;
+ }
+ }
+
static final boolean privateActionsAllowed() {
return conf.getBoolean(PRIVATE_ACTIONS_KEY, false);
}
@@ -268,10 +308,10 @@
synchronized(jobHistoryCache) {
JobInfo jobInfo = jobHistoryCache.remove(jobid);
if (jobInfo == null) {
- jobInfo = new JobHistory.JobInfo(jobid);
+ JobHistoryParser parser = new JobHistoryParser(fs, logFile);
+ jobInfo = parser.parse();
LOG.info("Loading Job History file "+jobid + ". Cache size is " +
jobHistoryCache.size());
- DefaultJobHistoryParser.parseJobTasks( logFile, jobInfo, fs) ;
}
jobHistoryCache.put(jobid, jobInfo);
if (jobHistoryCache.size() > CACHE_SIZE) {
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java Sat Nov 28 20:26:01 2009
@@ -17,60 +17,28 @@
*/
package org.apache.hadoop.mapred;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.net.URL;
-import java.net.URLConnection;
-import java.net.UnknownHostException;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.List;
-import javax.security.auth.login.LoginException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.mapreduce.tools.CLI;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.Counters.Group;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -156,16 +124,15 @@
* @see ClusterStatus
* @see Tool
* @see DistributedCache
+ * @deprecated Use {@link Job} and {@link Cluster} instead
*/
-public class JobClient extends Configured implements MRConstants, Tool {
- private static final Log LOG = LogFactory.getLog(JobClient.class);
+@Deprecated
+public class JobClient extends CLI {
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
- private static final long MAX_JOBPROFILE_AGE = 1000 * 2;
static{
- Configuration.addDefaultResource("mapred-default.xml");
- Configuration.addDefaultResource("mapred-site.xml");
+ ConfigUtil.loadResources();
}
/**
@@ -174,9 +141,7 @@
* remote service to provide certain functionality.
*/
class NetworkedJob implements RunningJob {
- JobStatus status;
- long statustime;
-
+ Job job;
/**
* We store a JobProfile and a timestamp for when we last
* acquired the job profile. If the job is null, then we cannot
@@ -184,66 +149,47 @@
* has completely forgotten about the job. (eg, 24 hours after the
* job completes.)
*/
- public NetworkedJob(JobStatus job) throws IOException {
- this.status = job;
- this.statustime = System.currentTimeMillis();
+ public NetworkedJob(JobStatus status) throws IOException {
+ job = Job.getInstance(cluster, status, new JobConf(status.getJobFile()));
}
- /**
- * Some methods rely on having a recent job profile object. Refresh
- * it, if necessary
- */
- synchronized void ensureFreshStatus() throws IOException {
- if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
- updateStatus();
- }
- }
-
- /** Some methods need to update status immediately. So, refresh
- * immediately
- * @throws IOException
- */
- synchronized void updateStatus() throws IOException {
- this.status = jobSubmitClient.getJobStatus(status.getJobID());
- if (this.status == null) {
- throw new IOException("Job status not available ");
- }
- this.statustime = System.currentTimeMillis();
+ public NetworkedJob(Job job) throws IOException {
+ this.job = job;
}
/**
* An identifier for the job
*/
public JobID getID() {
- return status.getJobID();
+ return JobID.downgrade(job.getID());
}
/** @deprecated This method is deprecated and will be removed. Applications should
* rather use {@link #getID()}.*/
@Deprecated
public String getJobID() {
- return status.getJobID().toString();
+ return getID().toString();
}
/**
* The user-specified job name
*/
public String getJobName() {
- return status.getJobName();
+ return job.getJobName();
}
/**
* The name of the job file
*/
public String getJobFile() {
- return status.getJobFile();
+ return job.getJobFile();
}
/**
* A URL where the job's status can be seen
*/
public String getTrackingURL() {
- return status.getTrackingUrl();
+ return job.getTrackingURL();
}
/**
@@ -251,8 +197,11 @@
* completed.
*/
public float mapProgress() throws IOException {
- ensureFreshStatus();
- return status.mapProgress();
+ try {
+ return job.mapProgress();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -260,8 +209,11 @@
* completed.
*/
public float reduceProgress() throws IOException {
- ensureFreshStatus();
- return status.reduceProgress();
+ try {
+ return job.reduceProgress();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -269,8 +221,11 @@
* completed.
*/
public float cleanupProgress() throws IOException {
- ensureFreshStatus();
- return status.cleanupProgress();
+ try {
+ return job.cleanupProgress();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -278,37 +233,45 @@
* completed.
*/
public float setupProgress() throws IOException {
- ensureFreshStatus();
- return status.setupProgress();
+ try {
+ return job.setupProgress();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
* Returns immediately whether the whole job is done yet or not.
*/
public synchronized boolean isComplete() throws IOException {
- updateStatus();
- return (status.getRunState() == JobStatus.SUCCEEDED ||
- status.getRunState() == JobStatus.FAILED ||
- status.getRunState() == JobStatus.KILLED);
+ try {
+ return job.isComplete();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
* True iff job completed successfully.
*/
public synchronized boolean isSuccessful() throws IOException {
- updateStatus();
- return status.getRunState() == JobStatus.SUCCEEDED;
+ try {
+ return job.isSuccessful();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
* Blocks until the job is finished
*/
public void waitForCompletion() throws IOException {
- while (!isComplete()) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException ie) {
- }
+ try {
+ job.waitForCompletion(false);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ } catch (ClassNotFoundException ce) {
+ throw new IOException(ce);
}
}
@@ -316,15 +279,22 @@
* Tells the service to get the state of the current job.
*/
public synchronized int getJobState() throws IOException {
- updateStatus();
- return status.getRunState();
+ try {
+ return job.getJobState().getValue();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
* Tells the service to terminate the current job.
*/
public synchronized void killJob() throws IOException {
- jobSubmitClient.killJob(getID());
+ try {
+ job.killJob();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
@@ -333,7 +303,12 @@
*/
public synchronized void setJobPriority(String priority)
throws IOException {
- jobSubmitClient.setJobPriority(getID(), priority);
+ try {
+ job.setPriority(
+ org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -342,8 +317,17 @@
* @param shouldFail if true the task is failed and added to failed tasks list, otherwise
* it is just killed, w/o affecting job failure status.
*/
- public synchronized void killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException {
- jobSubmitClient.killTask(taskId, shouldFail);
+ public synchronized void killTask(TaskAttemptID taskId,
+ boolean shouldFail) throws IOException {
+ try {
+ if (shouldFail) {
+ job.failTask(taskId);
+ } else {
+ job.killTask(taskId);
+ }
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
@@ -356,9 +340,18 @@
* Fetch task completion events from jobtracker for this job.
*/
public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
- int startFrom) throws IOException{
- return jobSubmitClient.getTaskCompletionEvents(
- getID(), startFrom, 10);
+ int startFrom) throws IOException {
+ try {
+ org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls =
+ job.getTaskCompletionEvents(startFrom, 10);
+ TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
+ for (int i = 0 ; i < acls.length; i++ ) {
+ ret[i] = TaskCompletionEvent.downgrade(acls[i]);
+ }
+ return ret;
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -366,48 +359,52 @@
*/
@Override
public String toString() {
- try {
- updateStatus();
- } catch (IOException e) {
- }
- return "Job: " + status.getJobID() + "\n" +
- "status: " + JobStatus.getJobRunState(status.getRunState()) + "\n" +
- "file: " + status.getJobFile() + "\n" +
- "tracking URL: " + status.getTrackingUrl() + "\n" +
- "map() completion: " + status.mapProgress() + "\n" +
- "reduce() completion: " + status.reduceProgress() + "\n" +
- "history URL: " + status.getHistoryFile() + "\n" +
- "retired: " + status.isRetired();
+ return job.toString();
}
/**
* Returns the counters for this job
*/
public Counters getCounters() throws IOException {
- return jobSubmitClient.getJobCounters(getID());
+ try {
+ return Counters.downgrade(job.getCounters());
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
@Override
public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
- return jobSubmitClient.getTaskDiagnostics(id);
+ try {
+ return job.getTaskDiagnostics(id);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
public String getHistoryUrl() throws IOException {
- updateStatus();
- return status.getHistoryFile();
+ try {
+ return job.getHistoryUrl();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
public boolean isRetired() throws IOException {
- updateStatus();
- return status.isRetired();
+ try {
+ return job.isRetired();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+
+ boolean monitorAndPrintJob() throws IOException, InterruptedException {
+ return job.monitorAndPrintJob();
}
}
- private JobSubmissionProtocol jobSubmitClient;
- private Path sysDir = null;
+ Cluster cluster;
- private FileSystem fs = null;
-
/**
* Create a job client.
*/
@@ -422,7 +419,6 @@
* @throws IOException
*/
public JobClient(JobConf conf) throws IOException {
- setConf(conf);
init(conf);
}
@@ -434,7 +430,6 @@
* @throws IOException
*/
public JobClient(Configuration conf) throws IOException {
- setConf(conf);
init(new JobConf(conf));
}
@@ -444,19 +439,8 @@
* @throws IOException
*/
public void init(JobConf conf) throws IOException {
- String tracker = conf.get("mapred.job.tracker", "local");
- if ("local".equals(tracker)) {
- this.jobSubmitClient = new LocalJobRunner(conf);
- } else {
- this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
- }
- }
-
- private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
- Configuration conf) throws IOException {
- return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
- NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+ setConf(conf);
+ cluster = new Cluster(conf);
}
/**
@@ -467,16 +451,14 @@
*/
public JobClient(InetSocketAddress jobTrackAddr,
Configuration conf) throws IOException {
- jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
+ cluster = new Cluster(jobTrackAddr, conf);
}
/**
* Close the <code>JobClient</code>.
*/
public synchronized void close() throws IOException {
- if (!(jobSubmitClient instanceof LocalJobRunner)) {
- RPC.stopProxy(jobSubmitClient);
- }
+ cluster.close();
}
/**
@@ -486,208 +468,11 @@
* @return the filesystem handle.
*/
public synchronized FileSystem getFs() throws IOException {
- if (this.fs == null) {
- Path sysDir = getSystemDir();
- this.fs = sysDir.getFileSystem(getConf());
- }
- return fs;
- }
-
- /* see if two file systems are the same or not
- *
- */
- private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
- URI srcUri = srcFs.getUri();
- URI dstUri = destFs.getUri();
- if (srcUri.getScheme() == null) {
- return false;
- }
- if (!srcUri.getScheme().equals(dstUri.getScheme())) {
- return false;
- }
- String srcHost = srcUri.getHost();
- String dstHost = dstUri.getHost();
- if ((srcHost != null) && (dstHost != null)) {
- try {
- srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
- dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
- } catch(UnknownHostException ue) {
- return false;
- }
- if (!srcHost.equals(dstHost)) {
- return false;
- }
- }
- else if (srcHost == null && dstHost != null) {
- return false;
- }
- else if (srcHost != null && dstHost == null) {
- return false;
- }
- //check for ports
- if (srcUri.getPort() != dstUri.getPort()) {
- return false;
- }
- return true;
- }
-
- // copies a file to the jobtracker filesystem and returns the path where it
- // was copied to
- private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, Path originalPath,
- JobConf job, short replication) throws IOException {
- //check if we do not need to copy the files
- // is jt using the same file system.
- // just checking for uri strings... doing no dns lookups
- // to see if the filesystems are the same. This is not optimal.
- // but avoids name resolution.
-
- FileSystem remoteFs = null;
- remoteFs = originalPath.getFileSystem(job);
- if (compareFs(remoteFs, jtFs)) {
- return originalPath;
- }
- // this might have name collisions. copy will throw an exception
- //parse the original path to create new path
- Path newPath = new Path(parentDir, originalPath.getName());
- FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job);
- jtFs.setReplication(newPath, replication);
- return newPath;
- }
-
- /**
- * configure the jobconf of the user with the command line options of
- * -libjars, -files, -archives
- * @param conf
- * @throws IOException
- */
- private void configureCommandLineOptions(JobConf job, Path submitJobDir, Path submitJarFile)
- throws IOException {
-
- if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) {
- LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
- "Applications should implement Tool for the same.");
- }
-
- // Retrieve command line arguments placed into the JobConf
- // by GenericOptionsParser.
- String files = job.get("tmpfiles");
- String libjars = job.get("tmpjars");
- String archives = job.get("tmparchives");
-
- /*
- * set this user's id in job configuration, so later job files can be
- * accessed using this user's id
- */
- UnixUserGroupInformation ugi = getUGI(job);
-
- //
- // Figure out what fs the JobTracker is using. Copy the
- // job to it, under a temporary name. This allows DFS to work,
- // and under the local fs also provides UNIX-like object loading
- // semantics. (that is, if the job file is deleted right after
- // submission, we can still run the submission to completion)
- //
-
- // Create a number of filenames in the JobTracker's fs namespace
- FileSystem fs = getFs();
- LOG.debug("default FileSystem: " + fs.getUri());
- fs.delete(submitJobDir, true);
- submitJobDir = fs.makeQualified(submitJobDir);
- submitJobDir = new Path(submitJobDir.toUri().getPath());
- FsPermission mapredSysPerms = new FsPermission(JOB_DIR_PERMISSION);
- FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms);
- Path filesDir = new Path(submitJobDir, "files");
- Path archivesDir = new Path(submitJobDir, "archives");
- Path libjarsDir = new Path(submitJobDir, "libjars");
- short replication = (short)job.getInt("mapred.submit.replication", 10);
- // add all the command line files/ jars and archive
- // first copy them to jobtrackers filesystem
-
- if (files != null) {
- FileSystem.mkdirs(fs, filesDir, mapredSysPerms);
- String[] fileArr = files.split(",");
- for (String tmpFile: fileArr) {
- Path tmp = new Path(tmpFile);
- Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication);
- try {
- URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
- DistributedCache.addCacheFile(pathURI, job);
- } catch(URISyntaxException ue) {
- //should not throw a uri exception
- throw new IOException("Failed to create uri for " + tmpFile);
- }
- DistributedCache.createSymlink(job);
- }
- }
-
- if (libjars != null) {
- FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms);
- String[] libjarsArr = libjars.split(",");
- for (String tmpjars: libjarsArr) {
- Path tmp = new Path(tmpjars);
- Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
- DistributedCache.addFileToClassPath(newPath, job);
- }
- }
-
-
- if (archives != null) {
- FileSystem.mkdirs(fs, archivesDir, mapredSysPerms);
- String[] archivesArr = archives.split(",");
- for (String tmpArchives: archivesArr) {
- Path tmp = new Path(tmpArchives);
- Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication);
- try {
- URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
- DistributedCache.addCacheArchive(pathURI, job);
- } catch(URISyntaxException ue) {
- //should not throw an uri excpetion
- throw new IOException("Failed to create uri for " + tmpArchives);
- }
- DistributedCache.createSymlink(job);
- }
- }
-
- // set the timestamps of the archives and files
- TrackerDistributedCacheManager.determineTimestamps(job);
-
- String originalJarPath = job.getJar();
-
- if (originalJarPath != null) { // copy jar to JobTracker's fs
- // use jar name if job is not named.
- if ("".equals(job.getJobName())){
- job.setJobName(new Path(originalJarPath).getName());
- }
- job.setJar(submitJarFile.toString());
- fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
- fs.setReplication(submitJarFile, replication);
- fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
- } else {
- LOG.warn("No job jar file set. User classes may not be found. "+
- "See JobConf(Class) or JobConf#setJar(String).");
- }
-
- // Set the user's name and working directory
- job.setUser(ugi.getUserName());
- if (ugi.getGroupNames().length > 0) {
- job.set("group.name", ugi.getGroupNames()[0]);
- }
- if (job.getWorkingDirectory() == null) {
- job.setWorkingDirectory(fs.getWorkingDirectory());
- }
-
- }
-
-
- private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
- UnixUserGroupInformation ugi = null;
- try {
- ugi = UnixUserGroupInformation.login(job, true);
- } catch (LoginException e) {
- throw (IOException)(new IOException(
- "Failed to get the current user's information.").initCause(e));
+ try {
+ return cluster.getFileSystem();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
}
- return ugi;
}
/**
@@ -711,29 +496,26 @@
return submitJob(job);
}
- // job files are world-wide readable and owner writable
- final private static FsPermission JOB_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644); // rw-r--r--
-
- // job submission directory is world readable/writable/executable
- final static FsPermission JOB_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
-
/**
* Submit a job to the MR system.
* This returns a handle to the {@link RunningJob} which can be used to track
* the running-job.
*
- * @param job the job configuration.
+ * @param conf the job configuration.
* @return a handle to the {@link RunningJob} which can be used to track the
* running-job.
* @throws FileNotFoundException
* @throws IOException
*/
- public RunningJob submitJob(JobConf job) throws FileNotFoundException,
+ public RunningJob submitJob(JobConf conf) throws FileNotFoundException,
IOException {
try {
- return submitJobInternal(job);
+ conf.setBooleanIfUnset("mapred.mapper.new-api", false);
+ conf.setBooleanIfUnset("mapred.reducer.new-api", false);
+ Job job = Job.getInstance(cluster, conf);
+ job.submit();
+ conf.setUser(job.getUser());
+ return new NetworkedJob(job);
} catch (InterruptedException ie) {
throw new IOException("interrupted", ie);
} catch (ClassNotFoundException cnfe) {
@@ -741,187 +523,6 @@
}
}
- /**
- * Internal method for submitting jobs to the system.
- * @param job the configuration to submit
- * @return a proxy object for the running job
- * @throws FileNotFoundException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- * @throws IOException
- */
- public
- RunningJob submitJobInternal(JobConf job
- ) throws FileNotFoundException,
- ClassNotFoundException,
- InterruptedException,
- IOException {
- /*
- * configure the command line options correctly on the submitting dfs
- */
-
- JobID jobId = jobSubmitClient.getNewJobId();
- Path submitJobDir = new Path(getSystemDir(), jobId.toString());
- Path submitJarFile = new Path(submitJobDir, "job.jar");
- Path submitSplitFile = new Path(submitJobDir, "job.split");
- configureCommandLineOptions(job, submitJobDir, submitJarFile);
- Path submitJobFile = new Path(submitJobDir, "job.xml");
- int reduces = job.getNumReduceTasks();
- JobContext context = new JobContext(job, jobId);
-
- // Check the output specification
- if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
- org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
- ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
- output.checkOutputSpecs(context);
- } else {
- job.getOutputFormat().checkOutputSpecs(getFs(), job);
- }
-
- // Create the splits for the job
- LOG.debug("Creating splits at " + getFs().makeQualified(submitSplitFile));
- int maps;
- if (job.getUseNewMapper()) {
- maps = writeNewSplits(context, submitSplitFile);
- } else {
- maps = writeOldSplits(job, submitSplitFile);
- }
- job.set("mapred.job.split.file", submitSplitFile.toString());
- job.setNumMapTasks(maps);
-
- // Write job file to JobTracker's fs
- FSDataOutputStream out =
- FileSystem.create(getFs(), submitJobFile,
- new FsPermission(JOB_FILE_PERMISSION));
-
- try {
- job.writeXml(out);
- } finally {
- out.close();
- }
-
- //
- // Now, actually submit the job (using the submit name)
- //
- JobStatus status = jobSubmitClient.submitJob(jobId);
- if (status != null) {
- return new NetworkedJob(status);
- } else {
- throw new IOException("Could not launch job");
- }
- }
-
- private int writeOldSplits(JobConf job,
- Path submitSplitFile) throws IOException {
- InputSplit[] splits =
- job.getInputFormat().getSplits(job, job.getNumMapTasks());
- // sort the splits into order based on size, so that the biggest
- // go first
- Arrays.sort(splits, new Comparator<InputSplit>() {
- public int compare(InputSplit a, InputSplit b) {
- try {
- long left = a.getLength();
- long right = b.getLength();
- if (left == right) {
- return 0;
- } else if (left < right) {
- return 1;
- } else {
- return -1;
- }
- } catch (IOException ie) {
- throw new RuntimeException("Problem getting input split size",
- ie);
- }
- }
- });
- DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile, splits.length);
-
- try {
- DataOutputBuffer buffer = new DataOutputBuffer();
- RawSplit rawSplit = new RawSplit();
- for(InputSplit split: splits) {
- rawSplit.setClassName(split.getClass().getName());
- buffer.reset();
- split.write(buffer);
- rawSplit.setDataLength(split.getLength());
- rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
- rawSplit.setLocations(split.getLocations());
- rawSplit.write(out);
- }
- } finally {
- out.close();
- }
- return splits.length;
- }
-
- private static class NewSplitComparator
- implements Comparator<org.apache.hadoop.mapreduce.InputSplit>{
-
- @Override
- public int compare(org.apache.hadoop.mapreduce.InputSplit o1,
- org.apache.hadoop.mapreduce.InputSplit o2) {
- try {
- long len1 = o1.getLength();
- long len2 = o2.getLength();
- if (len1 < len2) {
- return 1;
- } else if (len1 == len2) {
- return 0;
- } else {
- return -1;
- }
- } catch (IOException ie) {
- throw new RuntimeException("exception in compare", ie);
- } catch (InterruptedException ie) {
- throw new RuntimeException("exception in compare", ie);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- private <T extends org.apache.hadoop.mapreduce.InputSplit>
- int writeNewSplits(JobContext job, Path submitSplitFile
- ) throws IOException, InterruptedException,
- ClassNotFoundException {
- JobConf conf = job.getJobConf();
- org.apache.hadoop.mapreduce.InputFormat<?,?> input =
- ReflectionUtils.newInstance(job.getInputFormatClass(), job.getJobConf());
-
- List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(job);
- T[] array = (T[])
- splits.toArray(new org.apache.hadoop.mapreduce.InputSplit[splits.size()]);
-
- // sort the splits into order based on size, so that the biggest
- // go first
- Arrays.sort(array, new NewSplitComparator());
- DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile,
- array.length);
- try {
- if (array.length != 0) {
- DataOutputBuffer buffer = new DataOutputBuffer();
- RawSplit rawSplit = new RawSplit();
- SerializationFactory factory = new SerializationFactory(conf);
- Serializer<T> serializer =
- factory.getSerializer((Class<T>) array[0].getClass());
- serializer.open(buffer);
- for(T split: array) {
- rawSplit.setClassName(split.getClass().getName());
- buffer.reset();
- serializer.serialize(split);
- rawSplit.setDataLength(split.getLength());
- rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
- rawSplit.setLocations(split.getLocations());
- rawSplit.write(out);
- }
- serializer.close();
- }
- } finally {
- out.close();
- }
- return array.length;
- }
-
/**
* Checks if the job directory is clean and has all the required components
* for (re) starting the job
@@ -953,125 +554,6 @@
return false;
}
- static class RawSplit implements Writable {
- private String splitClass;
- private BytesWritable bytes = new BytesWritable();
- private String[] locations;
- long dataLength;
-
- public void setBytes(byte[] data, int offset, int length) {
- bytes.set(data, offset, length);
- }
-
- public void setClassName(String className) {
- splitClass = className;
- }
-
- public String getClassName() {
- return splitClass;
- }
-
- public BytesWritable getBytes() {
- return bytes;
- }
-
- public void clearBytes() {
- bytes = null;
- }
-
- public void setLocations(String[] locations) {
- this.locations = locations;
- }
-
- public String[] getLocations() {
- return locations;
- }
-
- public void readFields(DataInput in) throws IOException {
- splitClass = Text.readString(in);
- dataLength = in.readLong();
- bytes.readFields(in);
- int len = WritableUtils.readVInt(in);
- locations = new String[len];
- for(int i=0; i < len; ++i) {
- locations[i] = Text.readString(in);
- }
- }
-
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, splitClass);
- out.writeLong(dataLength);
- bytes.write(out);
- WritableUtils.writeVInt(out, locations.length);
- for(int i = 0; i < locations.length; i++) {
- Text.writeString(out, locations[i]);
- }
- }
-
- public long getDataLength() {
- return dataLength;
- }
- public void setDataLength(long l) {
- dataLength = l;
- }
-
- }
-
- private static final int CURRENT_SPLIT_FILE_VERSION = 0;
- private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
-
- private DataOutputStream writeSplitsFileHeader(Configuration conf,
- Path filename,
- int length
- ) throws IOException {
- // write the splits to a file for the job tracker
- FileSystem fs = filename.getFileSystem(conf);
- FSDataOutputStream out =
- FileSystem.create(fs, filename, new FsPermission(JOB_FILE_PERMISSION));
- out.write(SPLIT_FILE_HEADER);
- WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
- WritableUtils.writeVInt(out, length);
- return out;
- }
-
- /** Create the list of input splits and write them out in a file for
- *the JobTracker. The format is:
- * <format version>
- * <numSplits>
- * for each split:
- * <RawSplit>
- * @param splits the input splits to write out
- * @param out the stream to write to
- */
- private void writeOldSplitsFile(InputSplit[] splits,
- FSDataOutputStream out) throws IOException {
- }
-
- /**
- * Read a splits file into a list of raw splits
- * @param in the stream to read from
- * @return the complete list of splits
- * @throws IOException
- */
- static RawSplit[] readSplitFile(DataInput in) throws IOException {
- byte[] header = new byte[SPLIT_FILE_HEADER.length];
- in.readFully(header);
- if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
- throw new IOException("Invalid header on split file");
- }
- int vers = WritableUtils.readVInt(in);
- if (vers != CURRENT_SPLIT_FILE_VERSION) {
- throw new IOException("Unsupported split version " + vers);
- }
- int len = WritableUtils.readVInt(in);
- RawSplit[] result = new RawSplit[len];
- for(int i=0; i < len; ++i) {
- result[i] = new RawSplit();
- result[i].readFields(in);
- }
- return result;
- }
-
/**
* Get an {@link RunningJob} object to track an ongoing job. Returns
* null if the id does not correspond to any known job.
@@ -1082,12 +564,18 @@
* @throws IOException
*/
public RunningJob getJob(JobID jobid) throws IOException {
- JobStatus status = jobSubmitClient.getJobStatus(jobid);
- if (status != null) {
- return new NetworkedJob(status);
- } else {
- return null;
+ try {
+ Job job = cluster.getJob(jobid);
+ if (job != null) {
+ JobStatus status = JobStatus.downgrade(job.getStatus());
+ if (status != null) {
+ return new NetworkedJob(status);
+ }
+ }
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
}
+ return null;
}
/**@deprecated Applications should rather use {@link #getJob(JobID)}.
@@ -1105,7 +593,12 @@
* @throws IOException
*/
public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
- return jobSubmitClient.getMapTaskReports(jobId);
+ try {
+ return TaskReport.downgradeArray(
+ cluster.getJob(jobId).getTaskReports(TaskType.MAP));
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
@@ -1122,7 +615,12 @@
* @throws IOException
*/
public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
- return jobSubmitClient.getReduceTaskReports(jobId);
+ try {
+ return TaskReport.downgradeArray(
+ cluster.getJob(jobId).getTaskReports(TaskType.REDUCE));
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -1133,7 +631,12 @@
* @throws IOException
*/
public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
- return jobSubmitClient.getCleanupTaskReports(jobId);
+ try {
+ return TaskReport.downgradeArray(
+ cluster.getJob(jobId).getTaskReports(TaskType.JOB_CLEANUP));
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -1144,9 +647,15 @@
* @throws IOException
*/
public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
- return jobSubmitClient.getSetupTaskReports(jobId);
+ try {
+ return TaskReport.downgradeArray(
+ cluster.getJob(jobId).getTaskReports(TaskType.JOB_SETUP));
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
+
/**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
@Deprecated
public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
@@ -1164,37 +673,13 @@
*/
public void displayTasks(JobID jobId, String type, String state)
throws IOException {
- TaskReport[] reports = new TaskReport[0];
- if (type.equals("map")) {
- reports = getMapTaskReports(jobId);
- } else if (type.equals("reduce")) {
- reports = getReduceTaskReports(jobId);
- } else if (type.equals("setup")) {
- reports = getSetupTaskReports(jobId);
- } else if (type.equals("cleanup")) {
- reports = getCleanupTaskReports(jobId);
- }
- for (TaskReport report : reports) {
- TIPStatus status = report.getCurrentStatus();
- if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
- (state.equals("running") && status ==TIPStatus.RUNNING) ||
- (state.equals("completed") && status == TIPStatus.COMPLETE) ||
- (state.equals("failed") && status == TIPStatus.FAILED) ||
- (state.equals("killed") && status == TIPStatus.KILLED)) {
- printTaskAttempts(report);
- }
- }
- }
- private void printTaskAttempts(TaskReport report) {
- if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
- System.out.println(report.getSuccessfulTaskAttempt());
- } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
- for (TaskAttemptID t :
- report.getRunningTaskAttempts()) {
- System.out.println(t);
- }
+ try {
+ super.displayTasks(cluster.getJob(jobId), type, state);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
}
}
+
/**
* Get status information about the Map-Reduce cluster.
*
@@ -1203,7 +688,38 @@
* @throws IOException
*/
public ClusterStatus getClusterStatus() throws IOException {
- return getClusterStatus(false);
+ try {
+ ClusterMetrics metrics = cluster.getClusterStatus();
+ return new ClusterStatus(metrics.getTaskTrackerCount(),
+ metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(),
+ metrics.getOccupiedMapSlots(),
+ metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
+ metrics.getReduceSlotCapacity(),
+ JobTracker.State.valueOf(cluster.getJobTrackerState().name()),
+ metrics.getDecommissionedTaskTrackerCount());
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+
+ private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
+ Collection<String> list = new ArrayList<String>();
+ for (TaskTrackerInfo info: objs) {
+ list.add(info.getTaskTrackerName());
+ }
+ return list;
+ }
+
+ private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
+ Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
+ for (TaskTrackerInfo info: objs) {
+ BlackListInfo binfo = new BlackListInfo();
+ binfo.setTrackerName(info.getTaskTrackerName());
+ binfo.setReasonForBlackListing(info.getReasonForBlacklist());
+ binfo.setBlackListReport(info.getBlacklistReport());
+ list.add(binfo);
+ }
+ return list;
}
/**
@@ -1216,7 +732,17 @@
* @throws IOException
*/
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
- return jobSubmitClient.getClusterStatus(detailed);
+ try {
+ ClusterMetrics metrics = cluster.getClusterStatus();
+ return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
+ arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
+ cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
+ metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
+ metrics.getReduceSlotCapacity(),
+ JobTracker.State.valueOf(cluster.getJobTrackerState().name()));
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
@@ -1227,17 +753,13 @@
* @throws IOException
*/
public JobStatus[] jobsToComplete() throws IOException {
- return jobSubmitClient.jobsToComplete();
- }
-
- private static void downloadProfile(TaskCompletionEvent e
- ) throws IOException {
- URLConnection connection =
- new URL(getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) +
- "&filter=profile").openConnection();
- InputStream in = connection.getInputStream();
- OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
- IOUtils.copyBytes(in, out, 64 * 1024, true);
+ List<JobStatus> stats = new ArrayList<JobStatus>();
+ for (JobStatus stat : getAllJobs()) {
+ if (!stat.isJobComplete()) {
+ stats.add(stat);
+ }
+ }
+ return stats.toArray(new JobStatus[0]);
}
/**
@@ -1247,7 +769,16 @@
* @throws IOException
*/
public JobStatus[] getAllJobs() throws IOException {
- return jobSubmitClient.getAllJobs();
+ try {
+ Job jobs[] = cluster.getAllJobs();
+ JobStatus[] stats = new JobStatus[jobs.length];
+ for (int i = 0; i < jobs.length; i++) {
+ stats[i] = JobStatus.downgrade(jobs[i].getStatus());
+ }
+ return stats;
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -1281,129 +812,13 @@
public boolean monitorAndPrintJob(JobConf conf,
RunningJob job
) throws IOException, InterruptedException {
- String lastReport = null;
- TaskStatusFilter filter;
- filter = getTaskOutputFilter(conf);
- JobID jobId = job.getID();
- LOG.info("Running job: " + jobId);
- int eventCounter = 0;
- boolean profiling = conf.getProfileEnabled();
- Configuration.IntegerRanges mapRanges = conf.getProfileTaskRange(true);
- Configuration.IntegerRanges reduceRanges = conf.getProfileTaskRange(false);
-
- while (!job.isComplete()) {
- Thread.sleep(1000);
- String report =
- (" map " + StringUtils.formatPercent(job.mapProgress(), 0)+
- " reduce " +
- StringUtils.formatPercent(job.reduceProgress(), 0));
- if (!report.equals(lastReport)) {
- LOG.info(report);
- lastReport = report;
- }
-
- TaskCompletionEvent[] events =
- job.getTaskCompletionEvents(eventCounter);
- eventCounter += events.length;
- for(TaskCompletionEvent event : events){
- TaskCompletionEvent.Status status = event.getTaskStatus();
- if (profiling &&
- (status == TaskCompletionEvent.Status.SUCCEEDED ||
- status == TaskCompletionEvent.Status.FAILED) &&
- (event.isMap ? mapRanges : reduceRanges).
- isIncluded(event.idWithinJob())) {
- downloadProfile(event);
- }
- switch(filter){
- case NONE:
- break;
- case SUCCEEDED:
- if (event.getTaskStatus() ==
- TaskCompletionEvent.Status.SUCCEEDED){
- LOG.info(event.toString());
- displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
- }
- break;
- case FAILED:
- if (event.getTaskStatus() ==
- TaskCompletionEvent.Status.FAILED){
- LOG.info(event.toString());
- // Displaying the task diagnostic information
- TaskAttemptID taskId = event.getTaskAttemptId();
- String[] taskDiagnostics =
- jobSubmitClient.getTaskDiagnostics(taskId);
- if (taskDiagnostics != null) {
- for(String diagnostics : taskDiagnostics){
- System.err.println(diagnostics);
- }
- }
- // Displaying the task logs
- displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
- }
- break;
- case KILLED:
- if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
- LOG.info(event.toString());
- }
- break;
- case ALL:
- LOG.info(event.toString());
- displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
- break;
- }
- }
- }
- LOG.info("Job complete: " + jobId);
- Counters counters = job.getCounters();
- if (counters != null) {
- counters.log(LOG);
- }
- return job.isSuccessful();
+ return ((NetworkedJob)job).monitorAndPrintJob();
}
static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId);
}
- private static void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
- throws IOException {
- // The tasktracker for a 'failed/killed' job might not be around...
- if (baseUrl != null) {
- // Construct the url for the tasklogs
- String taskLogUrl = getTaskLogURL(taskId, baseUrl);
-
- // Copy tasks's stdout of the JobClient
- getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
-
- // Copy task's stderr to stderr of the JobClient
- getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
- }
- }
-
- private static void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl,
- OutputStream out) {
- try {
- URLConnection connection = taskLogUrl.openConnection();
- BufferedReader input =
- new BufferedReader(new InputStreamReader(connection.getInputStream()));
- BufferedWriter output =
- new BufferedWriter(new OutputStreamWriter(out));
- try {
- String logData = null;
- while ((logData = input.readLine()) != null) {
- if (logData.length() > 0) {
- output.write(taskId + ": " + logData + "\n");
- output.flush();
- }
- }
- } finally {
- input.close();
- }
- }catch(IOException ioe){
- LOG.warn("Error reading task output" + ioe.getMessage());
- }
- }
-
static Configuration getConfiguration(String jobTrackerSpec)
{
Configuration conf = new Configuration();
@@ -1463,405 +878,12 @@
return this.taskOutputFilter;
}
- private String getJobPriorityNames() {
- StringBuffer sb = new StringBuffer();
- for (JobPriority p : JobPriority.values()) {
- sb.append(p.name()).append(" ");
- }
- return sb.substring(0, sb.length()-1);
- }
-
- /**
- * Display usage of the command-line tool and terminate execution
- */
- private void displayUsage(String cmd) {
- String prefix = "Usage: JobClient ";
- String jobPriorityValues = getJobPriorityNames();
- String taskTypes = "map, reduce, setup, cleanup";
- String taskStates = "running, completed";
- if("-submit".equals(cmd)) {
- System.err.println(prefix + "[" + cmd + " <job-file>]");
- } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
- System.err.println(prefix + "[" + cmd + " <job-id>]");
- } else if ("-counter".equals(cmd)) {
- System.err.println(prefix + "[" + cmd + " <job-id> <group-name> <counter-name>]");
- } else if ("-events".equals(cmd)) {
- System.err.println(prefix + "[" + cmd + " <job-id> <from-event-#> <#-of-events>]");
- } else if ("-history".equals(cmd)) {
- System.err.println(prefix + "[" + cmd + " <jobOutputDir>]");
- } else if ("-list".equals(cmd)) {
- System.err.println(prefix + "[" + cmd + " [all]]");
- } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
- System.err.println(prefix + "[" + cmd + " <task-id>]");
- } else if ("-set-priority".equals(cmd)) {
- System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
- "Valid values for priorities are: "
- + jobPriorityValues);
- } else if ("-list-active-trackers".equals(cmd)) {
- System.err.println(prefix + "[" + cmd + "]");
- } else if ("-list-blacklisted-trackers".equals(cmd)) {
- System.err.println(prefix + "[" + cmd + "]");
- } else if ("-list-attempt-ids".equals(cmd)) {
- System.err.println(prefix + "[" + cmd +
- " <job-id> <task-type> <task-state>]. " +
- "Valid values for <task-type> are " + taskTypes + ". " +
- "Valid values for <task-state> are " + taskStates);
- } else {
- System.err.printf(prefix + "<command> <args>\n");
- System.err.printf("\t[-submit <job-file>]\n");
- System.err.printf("\t[-status <job-id>]\n");
- System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]\n");
- System.err.printf("\t[-kill <job-id>]\n");
- System.err.printf("\t[-set-priority <job-id> <priority>]. " +
- "Valid values for priorities are: " +
- jobPriorityValues + "\n");
- System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n");
- System.err.printf("\t[-history <jobOutputDir>]\n");
- System.err.printf("\t[-list [all]]\n");
- System.err.printf("\t[-list-active-trackers]\n");
- System.err.printf("\t[-list-blacklisted-trackers]\n");
- System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
- "<task-state>]\n");
- System.err.printf("\t[-kill-task <task-id>]\n");
- System.err.printf("\t[-fail-task <task-id>]\n\n");
- ToolRunner.printGenericCommandUsage(System.out);
- }
- }
-
- public int run(String[] argv) throws Exception {
- int exitCode = -1;
- if (argv.length < 1) {
- displayUsage("");
- return exitCode;
- }
- // process arguments
- String cmd = argv[0];
- String submitJobFile = null;
- String jobid = null;
- String taskid = null;
- String outputDir = null;
- String counterGroupName = null;
- String counterName = null;
- String newPriority = null;
- String taskType = null;
- String taskState = null;
- int fromEvent = 0;
- int nEvents = 0;
- boolean getStatus = false;
- boolean getCounter = false;
- boolean killJob = false;
- boolean listEvents = false;
- boolean viewHistory = false;
- boolean viewAllHistory = false;
- boolean listJobs = false;
- boolean listAllJobs = false;
- boolean listActiveTrackers = false;
- boolean listBlacklistedTrackers = false;
- boolean displayTasks = false;
- boolean killTask = false;
- boolean failTask = false;
- boolean setJobPriority = false;
-
- if ("-submit".equals(cmd)) {
- if (argv.length != 2) {
- displayUsage(cmd);
- return exitCode;
- }
- submitJobFile = argv[1];
- } else if ("-status".equals(cmd)) {
- if (argv.length != 2) {
- displayUsage(cmd);
- return exitCode;
- }
- jobid = argv[1];
- getStatus = true;
- } else if("-counter".equals(cmd)) {
- if (argv.length != 4) {
- displayUsage(cmd);
- return exitCode;
- }
- getCounter = true;
- jobid = argv[1];
- counterGroupName = argv[2];
- counterName = argv[3];
- } else if ("-kill".equals(cmd)) {
- if (argv.length != 2) {
- displayUsage(cmd);
- return exitCode;
- }
- jobid = argv[1];
- killJob = true;
- } else if ("-set-priority".equals(cmd)) {
- if (argv.length != 3) {
- displayUsage(cmd);
- return exitCode;
- }
- jobid = argv[1];
- newPriority = argv[2];
- try {
- JobPriority jp = JobPriority.valueOf(newPriority);
- } catch (IllegalArgumentException iae) {
- displayUsage(cmd);
- return exitCode;
- }
- setJobPriority = true;
- } else if ("-events".equals(cmd)) {
- if (argv.length != 4) {
- displayUsage(cmd);
- return exitCode;
- }
- jobid = argv[1];
- fromEvent = Integer.parseInt(argv[2]);
- nEvents = Integer.parseInt(argv[3]);
- listEvents = true;
- } else if ("-history".equals(cmd)) {
- if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
- displayUsage(cmd);
- return exitCode;
- }
- viewHistory = true;
- if (argv.length == 3 && "all".equals(argv[1])) {
- viewAllHistory = true;
- outputDir = argv[2];
- } else {
- outputDir = argv[1];
- }
- } else if ("-list".equals(cmd)) {
- if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
- displayUsage(cmd);
- return exitCode;
- }
- if (argv.length == 2 && "all".equals(argv[1])) {
- listAllJobs = true;
- } else {
- listJobs = true;
- }
- } else if("-kill-task".equals(cmd)) {
- if(argv.length != 2) {
- displayUsage(cmd);
- return exitCode;
- }
- killTask = true;
- taskid = argv[1];
- } else if("-fail-task".equals(cmd)) {
- if(argv.length != 2) {
- displayUsage(cmd);
- return exitCode;
- }
- failTask = true;
- taskid = argv[1];
- } else if ("-list-active-trackers".equals(cmd)) {
- if (argv.length != 1) {
- displayUsage(cmd);
- return exitCode;
- }
- listActiveTrackers = true;
- } else if ("-list-blacklisted-trackers".equals(cmd)) {
- if (argv.length != 1) {
- displayUsage(cmd);
- return exitCode;
- }
- listBlacklistedTrackers = true;
- } else if ("-list-attempt-ids".equals(cmd)) {
- if (argv.length != 4) {
- displayUsage(cmd);
- return exitCode;
- }
- jobid = argv[1];
- taskType = argv[2];
- taskState = argv[3];
- displayTasks = true;
- } else {
- displayUsage(cmd);
- return exitCode;
- }
-
- // initialize JobClient
- JobConf conf = null;
- if (submitJobFile != null) {
- conf = new JobConf(submitJobFile);
- } else {
- conf = new JobConf(getConf());
- }
- init(conf);
-
- // Submit the request
- try {
- if (submitJobFile != null) {
- RunningJob job = submitJob(conf);
- System.out.println("Created job " + job.getID());
- exitCode = 0;
- } else if (getStatus) {
- RunningJob job = getJob(JobID.forName(jobid));
- if (job == null) {
- System.out.println("Could not find job " + jobid);
- } else {
- System.out.println();
- System.out.println(job);
- Counters counters = job.getCounters();
- if (counters != null) {
- System.out.println(counters);
- } else {
- System.out.println("Counters not available. Job is retired.");
- }
- exitCode = 0;
- }
- } else if (getCounter) {
- RunningJob job = getJob(JobID.forName(jobid));
- if (job == null) {
- System.out.println("Could not find job " + jobid);
- } else {
- Counters counters = job.getCounters();
- if (counters == null) {
- System.out.println("Counters not available for retired job " +
- jobid);
- exitCode = -1;
- } else {
- Group group = counters.getGroup(counterGroupName);
- Counter counter = group.getCounterForName(counterName);
- System.out.println(counter.getCounter());
- exitCode = 0;
- }
- }
- } else if (killJob) {
- RunningJob job = getJob(JobID.forName(jobid));
- if (job == null) {
- System.out.println("Could not find job " + jobid);
- } else {
- job.killJob();
- System.out.println("Killed job " + jobid);
- exitCode = 0;
- }
- } else if (setJobPriority) {
- RunningJob job = getJob(JobID.forName(jobid));
- if (job == null) {
- System.out.println("Could not find job " + jobid);
- } else {
- job.setJobPriority(newPriority);
- System.out.println("Changed job priority.");
- exitCode = 0;
- }
- } else if (viewHistory) {
- viewHistory(outputDir, viewAllHistory);
- exitCode = 0;
- } else if (listEvents) {
- listEvents(JobID.forName(jobid), fromEvent, nEvents);
- exitCode = 0;
- } else if (listJobs) {
- listJobs();
- exitCode = 0;
- } else if (listAllJobs) {
- listAllJobs();
- exitCode = 0;
- } else if (listActiveTrackers) {
- listActiveTrackers();
- exitCode = 0;
- } else if (listBlacklistedTrackers) {
- listBlacklistedTrackers();
- exitCode = 0;
- } else if (displayTasks) {
- displayTasks(JobID.forName(jobid), taskType, taskState);
- } else if(killTask) {
- if(jobSubmitClient.killTask(TaskAttemptID.forName(taskid), false)) {
- System.out.println("Killed task " + taskid);
- exitCode = 0;
- } else {
- System.out.println("Could not kill task " + taskid);
- exitCode = -1;
- }
- } else if(failTask) {
- if(jobSubmitClient.killTask(TaskAttemptID.forName(taskid), true)) {
- System.out.println("Killed task " + taskid + " by failing it");
- exitCode = 0;
- } else {
- System.out.println("Could not fail task " + taskid);
- exitCode = -1;
- }
- }
- } finally {
- close();
- }
- return exitCode;
- }
-
- private void viewHistory(String outputDir, boolean all)
- throws IOException {
- HistoryViewer historyViewer = new HistoryViewer(outputDir,
- getConf(), all);
- historyViewer.print();
+ protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
+ String counterGroupName, String counterName) throws IOException {
+ Counters counters = Counters.downgrade(cntrs);
+ return counters.findCounter(counterGroupName, counterName).getValue();
}
- /**
- * List the events for the given job
- * @param jobId the job id for the job's events to list
- * @throws IOException
- */
- private void listEvents(JobID jobId, int fromEventId, int numEvents)
- throws IOException {
- TaskCompletionEvent[] events =
- jobSubmitClient.getTaskCompletionEvents(jobId, fromEventId, numEvents);
- System.out.println("Task completion events for " + jobId);
- System.out.println("Number of events (from " + fromEventId +
- ") are: " + events.length);
- for(TaskCompletionEvent event: events) {
- System.out.println(event.getTaskStatus() + " " + event.getTaskAttemptId() + " " +
- getTaskLogURL(event.getTaskAttemptId(),
- event.getTaskTrackerHttp()));
- }
- }
-
- /**
- * Dump a list of currently running jobs
- * @throws IOException
- */
- private void listJobs() throws IOException {
- JobStatus[] jobs = jobsToComplete();
- if (jobs == null)
- jobs = new JobStatus[0];
-
- System.out.printf("%d jobs currently running\n", jobs.length);
- displayJobList(jobs);
- }
-
- /**
- * Dump a list of all jobs submitted.
- * @throws IOException
- */
- private void listAllJobs() throws IOException {
- JobStatus[] jobs = getAllJobs();
- if (jobs == null)
- jobs = new JobStatus[0];
- System.out.printf("%d jobs submitted\n", jobs.length);
- System.out.printf("States are:\n\tRunning : 1\tSucceded : 2" +
- "\tFailed : 3\tPrep : 4\n");
- displayJobList(jobs);
- }
-
- /**
- * Display the list of active trackers
- */
- private void listActiveTrackers() throws IOException {
- ClusterStatus c = jobSubmitClient.getClusterStatus(true);
- Collection<String> trackers = c.getActiveTrackerNames();
- for (String trackerName : trackers) {
- System.out.println(trackerName);
- }
- }
-
- /**
- * Display the list of blacklisted trackers
- */
- private void listBlacklistedTrackers() throws IOException {
- ClusterStatus c = jobSubmitClient.getClusterStatus(true);
- Collection<BlackListInfo> trackers = c.getBlackListedTrackersInfo();
- if(trackers.size() > 0) {
- System.out.println("BlackListedNode \t Reason \t Report");
- }
- for (BlackListInfo tracker : trackers) {
- System.out.println(tracker.toString());
- }
- }
-
void displayJobList(JobStatus[] jobs) {
System.out.printf("JobId\tState\tStartTime\tUserName\tPriority\tSchedulingInfo\n");
for (JobStatus job : jobs) {
@@ -1878,7 +900,11 @@
* @throws IOException
*/
public int getDefaultMaps() throws IOException {
- return getClusterStatus().getMaxMapTasks();
+ try {
+ return cluster.getClusterStatus().getMapSlotCapacity();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -1888,7 +914,11 @@
* @throws IOException
*/
public int getDefaultReduces() throws IOException {
- return getClusterStatus().getMaxReduceTasks();
+ try {
+ return cluster.getClusterStatus().getReduceSlotCapacity();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -1897,12 +927,54 @@
* @return the system directory where job-specific files are to be placed.
*/
public Path getSystemDir() {
- if (sysDir == null) {
- sysDir = new Path(jobSubmitClient.getSystemDir());
+ try {
+ return cluster.getSystemDir();
+ } catch (IOException ioe) {
+ return null;
+ } catch (InterruptedException ie) {
+ return null;
+ }
+ }
+
+ private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
+ throws IOException {
+ JobQueueInfo[] ret = new JobQueueInfo[queues.length];
+ for (int i = 0; i < queues.length; i++) {
+ ret[i] = new JobQueueInfo(queues[i]);
+ }
+ return ret;
+ }
+
+ /**
+ * Returns an array of queue information objects about root level queues
+ * configured
+ *
+ * @return the array of root level JobQueueInfo objects
+ * @throws IOException
+ */
+ public JobQueueInfo[] getRootQueues() throws IOException {
+ try {
+ return getJobQueueInfoArray(cluster.getRootQueues());
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+
+ /**
+ * Returns an array of queue information objects about immediate children
+ * of queue queueName.
+ *
+ * @param queueName
+ * @return the array of immediate children JobQueueInfo objects
+ * @throws IOException
+ */
+ public JobQueueInfo[] getChildQueues(String queueName) throws IOException {
+ try {
+ return getJobQueueInfoArray(cluster.getChildQueues(queueName));
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
}
- return sysDir;
}
-
/**
* Return an array of queue information objects about all the Job Queues
@@ -1912,7 +984,11 @@
* @throws IOException
*/
public JobQueueInfo[] getQueues() throws IOException {
- return jobSubmitClient.getQueues();
+ try {
+ return getJobQueueInfoArray(cluster.getQueues());
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -1924,7 +1000,17 @@
*/
public JobStatus[] getJobsFromQueue(String queueName) throws IOException {
- return jobSubmitClient.getJobsFromQueue(queueName);
+ try {
+ org.apache.hadoop.mapreduce.JobStatus[] stats =
+ cluster.getQueue(queueName).getJobStatuses();
+ JobStatus[] ret = new JobStatus[stats.length];
+ for (int i = 0 ; i < stats.length; i++ ) {
+ ret[i] = JobStatus.downgrade(stats[i]);
+ }
+ return ret;
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -1935,7 +1021,11 @@
* @throws IOException
*/
public JobQueueInfo getQueueInfo(String queueName) throws IOException {
- return jobSubmitClient.getQueueInfo(queueName);
+ try {
+ return new JobQueueInfo(cluster.getQueue(queueName));
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -1944,7 +1034,17 @@
* @throws IOException
*/
public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
- return jobSubmitClient.getQueueAclsForCurrentUser();
+ try {
+ org.apache.hadoop.mapreduce.QueueAclsInfo[] acls =
+ cluster.getQueueAclsForCurrentUser();
+ QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
+ for (int i = 0 ; i < acls.length; i++ ) {
+ ret[i] = QueueAclsInfo.downgrade(acls[i]);
+ }
+ return ret;
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**