You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/11 10:16:29 UTC
svn commit: r420760 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/conf/ src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Tue Jul 11 01:16:28 2006
New Revision: 420760
URL: http://svn.apache.org/viewvc?rev=420760&view=rev
Log:
HADOOP-313. Permit task state to be saved so that single tasks may be manually re-executed when debugging. Contributed by Owen.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jul 11 01:16:28 2006
@@ -7,6 +7,9 @@
/bin/bash, for better portability.
(Jean-Baptiste Quenot via cutting)
+ 2. HADOOP-313. Permit task state to be saved so that single tasks
+ may be manually re-executed when debugging. (omalley via cutting)
+
Release 0.4.0 - 2006-06-28
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java Tue Jul 11 01:16:28 2006
@@ -494,6 +494,14 @@
}
}
+ /**
+ * Set the class loader that will be used to load the various objects.
+ * @param classLoader the new class loader
+ */
+ public void setClassLoader(ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ }
+
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("Configuration: ");
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=420760&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Tue Jul 11 01:16:28 2006
@@ -0,0 +1,162 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.SequenceFile;
+
+public class IsolationRunner {
+ private static final Log LOG =
+ LogFactory.getLog(IsolationRunner.class.getName());
+
+ private static class FakeUmbilical implements TaskUmbilicalProtocol {
+
+ public void done(String taskid) throws IOException {
+ LOG.info("Task " + taskid + " reporting done.");
+ }
+
+ public void fsError(String message) throws IOException {
+ LOG.info("Task reporting file system error: " + message);
+ }
+
+ public Task getTask(String taskid) throws IOException {
+ return null;
+ }
+
+ public boolean ping(String taskid) throws IOException {
+ return true;
+ }
+
+ public void progress(String taskid, float progress, String state
+ ) throws IOException {
+ StringBuffer buf = new StringBuffer("Task ");
+ buf.append(taskid);
+ buf.append(" making progress to ");
+ buf.append(progress);
+ if (state != null) {
+ buf.append(" and state of ");
+ buf.append(state);
+ }
+ LOG.info(buf.toString());
+ }
+
+ public void reportDiagnosticInfo(String taskid, String trace) throws IOException {
+ LOG.info("Task " + taskid + " has problem " + trace);
+ }
+
+ }
+
+ private static ClassLoader makeClassLoader(JobConf conf,
+ File workDir) throws IOException {
+ List cp = new ArrayList();
+
+ String jar = conf.getJar();
+ if (jar != null) { // if jar exists, it into workDir
+ File[] libs = new File(workDir, "lib").listFiles();
+ if (libs != null) {
+ for (int i = 0; i < libs.length; i++) {
+ cp.add(new URL("file:" + libs[i].toString()));
+ }
+ }
+ cp.add(new URL("file:" + new File(workDir, "classes/").toString()));
+ cp.add(new URL("file:" + workDir.toString() + "/"));
+ }
+ return new URLClassLoader((URL[]) cp.toArray(new URL[cp.size()]));
+ }
+
+ /**
+ * Create empty sequence files for any of the map outputs that we don't have.
+ * @param fs the filesystem to create the files in
+ * @param dir the directory name to create the files in
+ * @param conf the jobconf
+ * @throws IOException if something goes wrong writing
+ */
+ private static void fillInMissingMapOutputs(FileSystem fs,
+ String taskId,
+ int numMaps,
+ JobConf conf) throws IOException {
+ Class keyClass = conf.getMapOutputKeyClass();
+ Class valueClass = conf.getMapOutputValueClass();
+ MapOutputFile namer = new MapOutputFile();
+ namer.setConf(conf);
+ for(int i=0; i<numMaps; i++) {
+ Path f = namer.getInputFile(i, taskId);
+ if(! fs.exists(f)) {
+ LOG.info("Create missing input: " + f);
+ SequenceFile.Writer out = new SequenceFile.Writer(fs, f, keyClass,
+ valueClass);
+ out.close();
+ }
+ }
+ }
+
+ /**
+ * Run a single task
+ * @param args the first argument is the task directory
+ */
+ public static void main(String[] args) throws IOException {
+ if (args.length != 1) {
+ System.out.println("Usage: IsolationRunner <path>/job.xml");
+ System.exit(1);
+ }
+ File jobFilename = new File(args[0]);
+ if (!jobFilename.exists() || !jobFilename.isFile()) {
+ System.out.println(jobFilename + " is not a valid job file.");
+ System.exit(1);
+ }
+ JobConf conf = new JobConf(new Path(jobFilename.toString()));
+ String taskId = conf.get("mapred.task.id");
+ boolean isMap = conf.getBoolean("mapred.task.is.map", true);
+ String jobId = conf.get("mapred.job.id");
+ int partition = conf.getInt("mapred.task.partition", 0);
+
+ // setup the local and user working directories
+ FileSystem local = FileSystem.getNamed("local", conf);
+ File workDirName = new File(jobFilename.getParent(), "work");
+ local.setWorkingDirectory(new Path(workDirName.toString()));
+ FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
+
+ // set up a classloader with the right classpath
+ ClassLoader classLoader = makeClassLoader(conf, workDirName);
+ Thread.currentThread().setContextClassLoader(classLoader);
+ conf.setClassLoader(classLoader);
+
+ Task task;
+ if (isMap) {
+ FileSplit split = new FileSplit(new Path(conf.get("map.input.file")),
+ conf.getLong("map.input.start", 0),
+ conf.getLong("map.input.length", 0));
+ task = new MapTask(jobId, jobFilename.toString(), taskId, partition,
+ split);
+ } else {
+ int numMaps = conf.getNumMapTasks();
+ fillInMissingMapOutputs(local, taskId, numMaps, conf);
+ task = new ReduceTask(jobId, jobFilename.toString(), taskId,
+ partition, numMaps);
+ }
+ task.setConf(conf);
+ task.run(conf, new FakeUmbilical());
+ }
+
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Tue Jul 11 01:16:28 2006
@@ -194,6 +194,22 @@
public void setUser(String user) {
set("user.name", user);
}
+
+ /**
+ * Set whether the framework shoul keep the intermediate files for
+ * failed tasks.
+ */
+ public void setKeepFailedTaskFiles(boolean keep) {
+ setBoolean("keep.failed.task.files", keep);
+ }
+
+ /**
+ * Should the temporary files for failed tasks be kept?
+ * @return should the files be kept?
+ */
+ public boolean getKeepFailedTaskFiles() {
+ return getBoolean("keep.failed.task.files", false);
+ }
/**
* Set the current working directory for the default file system
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jul 11 01:16:28 2006
@@ -78,13 +78,14 @@
// split input into minimum number of splits
FileSplit[] splits;
splits = job.getInputFormat().getSplits(fs, job, 1);
-
+ String jobId = profile.getJobId();
// run a map task for each split
job.setNumReduceTasks(1); // force a single reduce task
for (int i = 0; i < splits.length; i++) {
mapIds.add("map_" + newId());
- MapTask map = new MapTask(file, (String)mapIds.get(i), splits[i]);
+ MapTask map = new MapTask(jobId, file, (String)mapIds.get(i), i,
+ splits[i]);
map.setConf(job);
map_tasks += 1;
map.run(job, this);
@@ -104,8 +105,8 @@
}
// run a single reduce task
- ReduceTask reduce = new ReduceTask(profile.getJobId(), file,
- reduceId, mapIds.size(),0);
+ ReduceTask reduce = new ReduceTask(jobId, file,
+ reduceId, 0, mapIds.size());
reduce.setConf(job);
reduce_tasks += 1;
reduce.run(job, this);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Tue Jul 11 01:16:28 2006
@@ -39,8 +39,9 @@
public MapTask() {}
- public MapTask(String jobFile, String taskId, FileSplit split) {
- super(jobFile, taskId);
+ public MapTask(String jobId, String jobFile, String taskId,
+ int partition, FileSplit split) {
+ super(jobId, jobFile, taskId, partition);
this.split = split;
}
@@ -48,6 +49,13 @@
return true;
}
+ public void localizeConfiguration(JobConf conf) {
+ super.localizeConfiguration(conf);
+ conf.set("map.input.file", split.getPath().toString());
+ conf.setLong("map.input.start", split.getStart());
+ conf.setLong("map.input.length", split.getLength());
+ }
+
public TaskRunner createRunner(TaskTracker tracker) {
return new MapTaskRunner(this, tracker, this.conf);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Jul 11 01:16:28 2006
@@ -36,9 +36,7 @@
});
}
- private UTF8 jobId = new UTF8();
private int numMaps;
- private int partition;
private boolean sortComplete;
{ getProgress().setStatus("reduce"); }
@@ -52,11 +50,9 @@
public ReduceTask() {}
public ReduceTask(String jobId, String jobFile, String taskId,
- int numMaps, int partition) {
- super(jobFile, taskId);
- this.jobId.set(jobId);
+ int partition, int numMaps) {
+ super(jobId, jobFile, taskId, partition);
this.numMaps = numMaps;
- this.partition = partition;
}
public TaskRunner createRunner(TaskTracker tracker) throws IOException {
@@ -67,31 +63,26 @@
return false;
}
+ public int getNumMaps() { return numMaps; }
+
/**
- * Get the job name for this task.
- * @return the job name
+ * Localize the given JobConf to be specific for this task.
*/
- public UTF8 getJobId() {
- return jobId;
+ public void localizeConfiguration(JobConf conf) {
+ super.localizeConfiguration(conf);
+ conf.setNumMapTasks(numMaps);
}
-
- public int getNumMaps() { return numMaps; }
- public int getPartition() { return partition; }
public void write(DataOutput out) throws IOException {
super.write(out);
- jobId.write(out);
out.writeInt(numMaps); // write the number of maps
- out.writeInt(partition); // write partition
}
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- jobId.readFields(in);
numMaps = in.readInt();
- this.partition = in.readInt(); // read partition
}
/** Iterates values while keys match in sorted input. */
@@ -215,7 +206,7 @@
// sort the input file
SequenceFile.Sorter sorter =
new SequenceFile.Sorter(lfs, comparator, valueClass, job);
- sorter.sort(mapFiles, sortedFile, true); // sort
+ sorter.sort(mapFiles, sortedFile, !conf.getKeepFailedTaskFiles()); // sort
} finally {
sortComplete = true;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Jul 11 01:16:28 2006
@@ -35,15 +35,20 @@
private String jobFile; // job configuration file
private String taskId; // unique, includes job id
+ private String jobId; // unique jobid
+ private int partition; // id within job
+
////////////////////////////////////////////
// Constructors
////////////////////////////////////////////
public Task() {}
- public Task(String jobFile, String taskId) {
+ public Task(String jobId, String jobFile, String taskId, int partition) {
this.jobFile = jobFile;
this.taskId = taskId;
+ this.jobId = jobId;
+ this.partition = partition;
}
////////////////////////////////////////////
@@ -52,6 +57,22 @@
public void setJobFile(String jobFile) { this.jobFile = jobFile; }
public String getJobFile() { return jobFile; }
public String getTaskId() { return taskId; }
+
+ /**
+ * Get the job name for this task.
+ * @return the job name
+ */
+ public String getJobId() {
+ return jobId;
+ }
+
+ /**
+ * Get the index of this task within the job.
+ * @return the integer part of the task id
+ */
+ public int getPartition() {
+ return partition;
+ }
////////////////////////////////////////////
// Writable methods
@@ -60,14 +81,28 @@
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, jobFile);
UTF8.writeString(out, taskId);
+ UTF8.writeString(out, jobId);
+ out.writeInt(partition);
}
public void readFields(DataInput in) throws IOException {
jobFile = UTF8.readString(in);
taskId = UTF8.readString(in);
+ jobId = UTF8.readString(in);
+ partition = in.readInt();
}
public String toString() { return taskId; }
+ /**
+ * Localize the given JobConf to be specific for this task.
+ */
+ public void localizeConfiguration(JobConf conf) {
+ conf.set("mapred.task.id", taskId);
+ conf.setBoolean("mapred.task.is.map",isMapTask());
+ conf.setInt("mapred.task.partition", partition);
+ conf.set("mapred.job.id", jobId);
+ }
+
/** Run this task as a part of the named job. This method is executed in the
* child process and is what invokes user-supplied map, reduce, etc. methods.
* @param umbilical for progress reports
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Jul 11 01:16:28 2006
@@ -413,12 +413,12 @@
String taskid = (String) usableTaskIds.first();
usableTaskIds.remove(taskid);
+ String jobId = job.getProfile().getJobId();
if (isMapTask()) {
- t = new MapTask(jobFile, taskid, split);
+ t = new MapTask(jobId, jobFile, taskid, partition, split);
} else {
- t = new ReduceTask(job.getProfile().getJobId(), jobFile, taskid,
- numMaps, partition);
+ t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps);
}
t.setConf(conf);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jul 11 01:16:28 2006
@@ -652,6 +652,7 @@
boolean wasKilled = false;
private JobConf defaultJobConf;
private JobConf localJobConf;
+ private boolean keepFailedTaskFiles;
/**
*/
@@ -682,7 +683,6 @@
t.setJobFile(localJobFile.toString());
localJobConf = new JobConf(localJobFile);
- localJobConf.set("mapred.task.id", task.getTaskId());
localJobConf.set("mapred.local.dir",
this.defaultJobConf.get("mapred.local.dir"));
String jarFile = localJobConf.getJar();
@@ -690,6 +690,7 @@
fs.copyToLocalFile(new Path(jarFile), localJarFile);
localJobConf.setJar(localJarFile.toString());
}
+ task.localizeConfiguration(localJobConf);
FileSystem localFs = FileSystem.getNamed("local", fConf);
OutputStream out = localFs.create(localJobFile);
@@ -701,6 +702,7 @@
// set the task's configuration to the local job conf
// rather than the default.
t.setConf(localJobConf);
+ keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
}
/**
@@ -875,6 +877,9 @@
LOG.debug("Cleaning up " + taskId);
synchronized (TaskTracker.this) {
tasks.remove(taskId);
+ if (runstate == TaskStatus.FAILED && keepFailedTaskFiles) {
+ return;
+ }
synchronized (this) {
try {
runner.close();