You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2009/04/01 22:25:15 UTC
svn commit: r761046 - in /hadoop/core/trunk: ./
src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/
src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapreduce/
Author: omalley
Date: Wed Apr 1 20:25:14 2009
New Revision: 761046
URL: http://svn.apache.org/viewvc?rev=761046&view=rev
Log:
HADOOP-5577. Add a verbose flag to mapreduce.Job.waitForCompletion to get
the running job's information printed to the user's stdout as it runs.
(omalley)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=761046&r1=761045&r2=761046&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 1 20:25:14 2009
@@ -1161,6 +1161,10 @@
updated before the JVM exits. Also makes the update to log.index atomic.
(Ravi Gummadi via ddas)
+ HADOOP-5577. Add a verbose flag to mapreduce.Job.waitForCompletion to get
+ the running job's information printed to the user's stdout as it runs.
+ (omalley)
+
Release 0.19.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java?rev=761046&r1=761045&r2=761046&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java Wed Apr 1 20:25:14 2009
@@ -363,7 +363,7 @@
out.println("\nStarting Job ...");
final long startTime = System.currentTimeMillis();
try {
- if (!job.waitForCompletion()) {
+ if (!job.waitForCompletion(true)) {
out.println("Job failed.");
System.exit(1);
}
Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java?rev=761046&r1=761045&r2=761046&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java Wed Apr 1 20:25:14 2009
@@ -233,7 +233,7 @@
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- System.exit(job.waitForCompletion() ? 0 : 1);
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/WordCount.java?rev=761046&r1=761045&r2=761046&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/WordCount.java Wed Apr 1 20:25:14 2009
@@ -64,6 +64,6 @@
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- System.exit(job.waitForCompletion() ? 0 : 1);
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=761046&r1=761045&r2=761046&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Wed Apr 1 20:25:14 2009
@@ -1245,126 +1245,107 @@
* complete.
*
* @param job the job configuration.
- * @throws IOException
+ * @throws IOException if the job fails
*/
public static RunningJob runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
- boolean error = true;
- RunningJob running = null;
- String lastReport = null;
- final int MAX_RETRIES = 5;
- int retries = MAX_RETRIES;
- TaskStatusFilter filter;
+ RunningJob rj = jc.submitJob(job);
try {
- filter = getTaskOutputFilter(job);
- } catch(IllegalArgumentException e) {
- LOG.warn("Invalid Output filter : " + e.getMessage() +
- " Valid values are : NONE, FAILED, SUCCEEDED, ALL");
- throw e;
+ if (!jc.monitorAndPrintJob(job, rj)) {
+ throw new IOException("Job failed!");
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
}
- try {
- running = jc.submitJob(job);
- JobID jobId = running.getID();
- LOG.info("Running job: " + jobId);
- int eventCounter = 0;
- boolean profiling = job.getProfileEnabled();
- Configuration.IntegerRanges mapRanges = job.getProfileTaskRange(true);
- Configuration.IntegerRanges reduceRanges = job.getProfileTaskRange(false);
-
- while (true) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {}
- try {
- if (running.isComplete()) {
- break;
- }
- running = jc.getJob(jobId);
- if (running == null) {
- throw new IOException("Unable to fetch job status from server.");
- }
- String report =
- (" map " + StringUtils.formatPercent(running.mapProgress(), 0)+
- " reduce " +
- StringUtils.formatPercent(running.reduceProgress(), 0));
- if (!report.equals(lastReport)) {
- LOG.info(report);
- lastReport = report;
- }
-
- TaskCompletionEvent[] events =
- running.getTaskCompletionEvents(eventCounter);
- eventCounter += events.length;
- for(TaskCompletionEvent event : events){
- TaskCompletionEvent.Status status = event.getTaskStatus();
- if (profiling &&
- (status == TaskCompletionEvent.Status.SUCCEEDED ||
- status == TaskCompletionEvent.Status.FAILED) &&
+ return rj;
+ }
+
+ /**
+ * Monitor a job and print status in real-time as progress is made and tasks
+ * fail.
+ * @param conf the job's configuration
+ * @param job the job to track
+ * @return true if the job succeeded
+ * @throws IOException if communication to the JobTracker fails
+ */
+ public boolean monitorAndPrintJob(JobConf conf,
+ RunningJob job
+ ) throws IOException, InterruptedException {
+ String lastReport = null;
+ TaskStatusFilter filter;
+ filter = getTaskOutputFilter(conf);
+ JobID jobId = job.getID();
+ LOG.info("Running job: " + jobId);
+ int eventCounter = 0;
+ boolean profiling = conf.getProfileEnabled();
+ Configuration.IntegerRanges mapRanges = conf.getProfileTaskRange(true);
+ Configuration.IntegerRanges reduceRanges = conf.getProfileTaskRange(false);
+
+ while (!job.isComplete()) {
+ Thread.sleep(1000);
+ String report =
+ (" map " + StringUtils.formatPercent(job.mapProgress(), 0)+
+ " reduce " +
+ StringUtils.formatPercent(job.reduceProgress(), 0));
+ if (!report.equals(lastReport)) {
+ LOG.info(report);
+ lastReport = report;
+ }
+
+ TaskCompletionEvent[] events =
+ job.getTaskCompletionEvents(eventCounter);
+ eventCounter += events.length;
+ for(TaskCompletionEvent event : events){
+ TaskCompletionEvent.Status status = event.getTaskStatus();
+ if (profiling &&
+ (status == TaskCompletionEvent.Status.SUCCEEDED ||
+ status == TaskCompletionEvent.Status.FAILED) &&
(event.isMap ? mapRanges : reduceRanges).
- isIncluded(event.idWithinJob())) {
- downloadProfile(event);
- }
- switch(filter){
- case NONE:
- break;
- case SUCCEEDED:
- if (event.getTaskStatus() ==
- TaskCompletionEvent.Status.SUCCEEDED){
- LOG.info(event.toString());
- displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
- }
- break;
- case FAILED:
- if (event.getTaskStatus() ==
- TaskCompletionEvent.Status.FAILED){
- LOG.info(event.toString());
- // Displaying the task diagnostic information
- TaskAttemptID taskId = event.getTaskAttemptId();
- String[] taskDiagnostics =
- jc.jobSubmitClient.getTaskDiagnostics(taskId);
- if (taskDiagnostics != null) {
- for(String diagnostics : taskDiagnostics){
- System.err.println(diagnostics);
- }
- }
- // Displaying the task logs
- displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
- }
- break;
- case KILLED:
- if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
- LOG.info(event.toString());
+ isIncluded(event.idWithinJob())) {
+ downloadProfile(event);
+ }
+ switch(filter){
+ case NONE:
+ break;
+ case SUCCEEDED:
+ if (event.getTaskStatus() ==
+ TaskCompletionEvent.Status.SUCCEEDED){
+ LOG.info(event.toString());
+ displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+ }
+ break;
+ case FAILED:
+ if (event.getTaskStatus() ==
+ TaskCompletionEvent.Status.FAILED){
+ LOG.info(event.toString());
+ // Displaying the task diagnostic information
+ TaskAttemptID taskId = event.getTaskAttemptId();
+ String[] taskDiagnostics =
+ jobSubmitClient.getTaskDiagnostics(taskId);
+ if (taskDiagnostics != null) {
+ for(String diagnostics : taskDiagnostics){
+ System.err.println(diagnostics);
}
- break;
- case ALL:
- LOG.info(event.toString());
- displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
- break;
}
+ // Displaying the task logs
+ displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
}
- retries = MAX_RETRIES;
- } catch (IOException ie) {
- if (--retries == 0) {
- LOG.warn("Final attempt failed, killing job.");
- throw ie;
+ break;
+ case KILLED:
+ if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
+ LOG.info(event.toString());
}
- LOG.info("Communication problem with server: " +
- StringUtils.stringifyException(ie));
+ break;
+ case ALL:
+ LOG.info(event.toString());
+ displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+ break;
}
}
- if (!running.isSuccessful()) {
- throw new IOException("Job failed!");
- }
- LOG.info("Job complete: " + jobId);
- running.getCounters().log(LOG);
- error = false;
- } finally {
- if (error && (running != null)) {
- running.killJob();
- }
- jc.close();
}
- return running;
+ LOG.info("Job complete: " + jobId);
+ job.getCounters().log(LOG);
+ return job.isSuccessful();
}
static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=761046&r1=761045&r2=761046&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java Wed Apr 1 20:25:14 2009
@@ -38,7 +38,7 @@
public class Job extends JobContext {
public static enum JobState {DEFINE, RUNNING};
private JobState state = JobState.DEFINE;
- private JobClient jobTracker;
+ private JobClient jobClient;
private RunningJob info;
public Job() throws IOException {
@@ -47,7 +47,7 @@
public Job(Configuration conf) throws IOException {
super(conf, null);
- jobTracker = new JobClient((JobConf) getConfiguration());
+ jobClient = new JobClient((JobConf) getConfiguration());
}
public Job(Configuration conf, String jobName) throws IOException {
@@ -429,22 +429,29 @@
ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
- info = jobTracker.submitJobInternal(conf);
+ info = jobClient.submitJobInternal(conf);
state = JobState.RUNNING;
}
/**
* Submit the job to the cluster and wait for it to finish.
+ * @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
- public boolean waitForCompletion() throws IOException, InterruptedException,
+ public boolean waitForCompletion(boolean verbose
+ ) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
- info.waitForCompletion();
+ if (verbose) {
+ jobClient.monitorAndPrintJob(conf, info);
+ } else {
+ info.waitForCompletion();
+ }
return isSuccessful();
}
+
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java?rev=761046&r1=761045&r2=761046&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java Wed Apr 1 20:25:14 2009
@@ -107,7 +107,7 @@
} else {
job.setOutputFormatClass(TextOutputFormat.class);
}
- assertTrue(job.waitForCompletion());
+ assertTrue(job.waitForCompletion(true));
}
public void createInput(FileSystem fs, int numMappers) throws Exception {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=761046&r1=761045&r2=761046&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Wed Apr 1 20:25:14 2009
@@ -111,7 +111,7 @@
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
- assertTrue(job.waitForCompletion());
+ assertTrue(job.waitForCompletion(false));
String out = readFile("out/part-r-00000");
System.out.println(out);
assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n",
@@ -156,7 +156,7 @@
FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
- assertTrue(job.waitForCompletion());
+ assertTrue(job.waitForCompletion(true));
String out = readFile("out/part-r-00000");
assertEquals("------------------------------------------------\n" +
"-3\t23\n" +