You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/11/21 21:40:25 UTC
svn commit: r477876 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Tue Nov 21 12:40:24 2006
New Revision: 477876
URL: http://svn.apache.org/viewvc?view=rev&rev=477876
Log:
HADOOP-76. Implement speculative reduce. Contributed by Sanjay.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Nov 21 12:40:24 2006
@@ -116,6 +116,12 @@
for sorting datanode list by various columns.
(Raghu Angadi via cutting)
+35. HADOOP-76. Implement speculative reduce. Now when a job is
+ configured for speculative execution, both maps and reduces will
+ execute speculatively. Reduce outputs are written to temporary
+ location and moved to the final location when reduce is complete.
+ (Sanjay Dahiya via cutting)
+
Release 0.8.0 - 2006-11-03
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Tue Nov 21 12:40:24 2006
@@ -154,12 +154,12 @@
FileSplit split = new FileSplit(new Path(conf.get("map.input.file")),
conf.getLong("map.input.start", 0),
conf.getLong("map.input.length", 0));
- task = new MapTask(jobId, jobFilename.toString(), taskId, partition,
- split);
+ task = new MapTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"),
+ taskId, partition, split);
} else {
int numMaps = conf.getNumMapTasks();
fillInMissingMapOutputs(local, taskId, numMaps, conf);
- task = new ReduceTask(jobId, jobFilename.toString(), taskId,
+ task = new ReduceTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), taskId,
partition, numMaps);
}
task.setConf(conf);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Nov 21 12:40:24 2006
@@ -357,7 +357,7 @@
return null;
}
- double avgProgress = status.reduceProgress() / reduces.length;
+ double avgProgress = status.reduceProgress() ;
int target = findNewTask(tts, clusterSize, avgProgress,
reduces, null);
if (target == -1) {
@@ -438,8 +438,10 @@
LOG.info("Choosing normal task " + tasks[i].getTIPId());
return i;
} else if (specTarget == -1 &&
- task.hasSpeculativeTask(avgProgress)) {
+ task.hasSpeculativeTask(avgProgress) &&
+ ! task.hasRunOnMachine(taskTracker)) {
specTarget = i;
+ break ;
}
}
}
@@ -691,6 +693,11 @@
// so we remove that directory to cleanup
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(profile.getJobFile()).getParent());
+
+ // Delete temp dfs dirs created if any, like in case of
+ // speculative exn of reduces.
+ // String tempDir = conf.get("mapred.system.dir") + "/job_" + uniqueString;
+ // fs.delete(new Path(tempDir));
} catch (IOException e) {
LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Nov 21 12:40:24 2006
@@ -96,8 +96,10 @@
// run a map task for each split
job.setNumReduceTasks(1); // force a single reduce task
for (int i = 0; i < splits.length; i++) {
- mapIds.add("map_" + newId());
- MapTask map = new MapTask(jobId, file, (String)mapIds.get(i), i,
+ String mapId = "map_" + newId() ;
+ mapIds.add(mapId);
+ MapTask map = new MapTask(jobId, file, "tip_m_" + mapId,
+ mapId, i,
splits[i]);
JobConf localConf = new JobConf(job);
map.localizeConfiguration(localConf);
@@ -126,7 +128,7 @@
{
ReduceTask reduce = new ReduceTask(jobId, file,
- reduceId, 0, mapIds.size());
+ "tip_r_0001", reduceId, 0, mapIds.size());
JobConf localConf = new JobConf(job);
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Tue Nov 21 12:40:24 2006
@@ -85,9 +85,9 @@
public MapTask() {}
- public MapTask(String jobId, String jobFile, String taskId,
+ public MapTask(String jobId, String jobFile, String tipId, String taskId,
int partition, FileSplit split) {
- super(jobId, jobFile, taskId, partition);
+ super(jobId, jobFile, tipId, taskId, partition);
this.split = split;
myMetrics = new MapTaskMetrics(taskId);
}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=auto&rev=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Tue Nov 21 12:40:24 2006
@@ -0,0 +1,464 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This class acts as a proxy to the actual file system being used.
+ * It writes files to a temporary location and on
+ * commit, moves to final location. On abort or a failure in
+ * commit the temporary file is deleted
+ * PhasedFileSystem works in context of a task. A different instance of
+ * PhasedFileSystem should be used for every task.
+ * Temporary files are written in ("mapred.system.dir")/<jobid>/<taskid>
+ * If one tasks opens a large number of files in succession then its
+ * better to commit(Path) individual files when done. Otherwise
+ * commit() can be used to commit all open files at once.
+ */
+class PhasedFileSystem extends FileSystem {
+
+ private FileSystem baseFS ;
+ // Map from final file name to temporary file name
+ private Map<Path, FileInfo> finalNameToFileInfo = new HashMap();
+
+ private String jobid ;
+ private String tipid ;
+ private String taskid ;
+
+ private Path tempDir ;
+ /**
+ * This Constructor is used to wrap a FileSystem object to a
+ * Phased FilsSystem.
+ * @param fs base file system object
+ * @param jobid JobId
+ * @param tipid tipId
+ * @param taskid taskId
+ */
+ public PhasedFileSystem(FileSystem fs, String jobid,
+ String tipid, String taskid) {
+ super(fs.getConf()); // not used
+
+ this.baseFS = fs ;
+ this.jobid = jobid;
+ this.tipid = tipid ;
+ this.taskid = taskid ;
+
+ tempDir = new Path(baseFS.getConf().get("mapred.system.dir") );
+ }
+ /**
+ * This Constructor is used to wrap a FileSystem object to a
+ * Phased FilsSystem.
+ * @param fs base file system object
+ * @param conf JobConf
+ */
+ public PhasedFileSystem(FileSystem fs, JobConf conf) {
+ super(fs.getConf()); // not used
+
+ this.baseFS = fs ;
+ this.jobid = conf.get("mapred.job.id");
+ this.tipid = conf.get("mapred.tip.id");
+ this.taskid = conf.get("mapred.task.id") ;
+
+ tempDir = new Path(baseFS.getConf().get("mapred.system.dir") );
+ }
+ /**
+ * This Constructor should not be used in this or any derived class.
+ * @param conf
+ */
+ protected PhasedFileSystem(Configuration conf){
+ super(conf);
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ private Path setupFile(Path finalFile, boolean overwrite) throws IOException{
+ if( finalNameToFileInfo.containsKey(finalFile) ){
+ if( !overwrite ){
+ throw new IOException("Error, file already exists : " +
+ finalFile.toString());
+ }else{
+ // delete tempp file and let create a new one.
+ FileInfo fInfo = finalNameToFileInfo.get(finalFile);
+ try{
+ fInfo.getOpenFileStream().close();
+ }catch(IOException ioe){
+ // ignore if already closed
+ }
+ baseFS.delete( fInfo.getTempPath() );
+ finalNameToFileInfo.remove(finalFile);
+ }
+ }
+
+ String uniqueFile = jobid + "/" + tipid + "/" + taskid + "/" + finalFile.getName();
+
+ Path tempPath = new Path(tempDir, new Path(uniqueFile));
+ FileInfo fInfo = new FileInfo(tempPath, finalFile, overwrite);
+
+ finalNameToFileInfo.put(finalFile, fInfo);
+
+ return tempPath ;
+ }
+
+ @Override
+ public FSOutputStream createRaw(
+ Path f, boolean overwrite, short replication, long blockSize)
+ throws IOException {
+
+ // for reduce output its checked in job client but lets check it anyways
+ // as tasks with side effect may write to locations not set in jobconf
+ // as output path.
+ if( baseFS.exists(f) && !overwrite ){
+ throw new IOException("Error creating file - already exists : " + f);
+ }
+ FSOutputStream stream =
+ baseFS.createRaw(setupFile(f, overwrite), overwrite, replication, blockSize);
+ finalNameToFileInfo.get(f).setOpenFileStream(stream);
+ return stream;
+ }
+
+ @Override
+ public FSOutputStream createRaw(
+ Path f, boolean overwrite, short replication, long blockSize,
+ Progressable progress)
+ throws IOException {
+ if( baseFS.exists(f) && !overwrite ){
+ throw new IOException("Error creating file - already exists : " + f);
+ }
+ FSOutputStream stream =
+ baseFS.createRaw(setupFile(f, overwrite), overwrite, replication,
+ blockSize, progress);
+ finalNameToFileInfo.get(f).setOpenFileStream(stream);
+ return stream ;
+ }
+ /**
+ * Commits a single file file to its final locations as passed in create* methods.
+ * If a file already exists in final location then temporary file is deleted.
+ * @param fPath path to final file.
+ * @throws IOException thrown if commit fails
+ */
+ public void commit(Path fPath) throws IOException{
+ commit(fPath, true);
+ }
+
+ // use extra method arg to avoid concurrentModificationException
+ // if committing using this method while iterating.
+ private void commit(Path fPath , boolean removeFromMap)throws IOException{
+ FileInfo fInfo = finalNameToFileInfo.get(fPath) ;
+ if( null == fInfo ){
+ throw new IOException("Error committing file! File was not created " +
+ "with PhasedFileSystem : " + fPath);
+ }
+ try{
+ fInfo.getOpenFileStream().close();
+ }catch(IOException ioe){
+ // ignore if already closed
+ ioe.printStackTrace();
+ }
+ Path tempPath = fInfo.getTempPath();
+ // ignore .crc files
+ if(! tempPath.toString().endsWith(".crc")){
+ if( !baseFS.exists(fPath) || fInfo.isOverwrite()){
+ if(! baseFS.exists(fPath.getParent())){
+ baseFS.mkdirs(fPath.getParent());
+ }
+
+ if( baseFS.exists(fPath) && fInfo.isOverwrite()){
+ baseFS.delete(fPath);
+ }
+
+ try {
+ if( ! baseFS.rename(fInfo.getTempPath(), fPath) ){
+ // delete the temp file if rename failed
+ baseFS.delete(fInfo.getTempPath());
+ }
+ }catch(IOException ioe){
+ // rename failed, log error and delete temp files
+ LOG.error("PhasedFileSystem failed to commit file : " + fPath
+ + " error : " + ioe.getMessage());
+ baseFS.delete(fInfo.getTempPath());
+ }
+ }else{
+ // delete temp file
+ baseFS.delete(fInfo.getTempPath());
+ }
+ // done with the file
+ if( removeFromMap ){
+ finalNameToFileInfo.remove(fPath);
+ }
+ }
+ }
+
+ /**
+ * Commits files to their final locations as passed in create* methods.
+ * If a file already exists in final location then temporary file is deleted.
+ * This methods ignores crc files (ending with .crc). This method doesnt close
+ * the file system so it can still be used to create new files.
+ * @throws IOException if any file fails to commit
+ */
+ public void commit() throws IOException {
+ for( Path fPath : finalNameToFileInfo.keySet()){
+ commit(fPath, false);
+ }
+ // safe to clear map now
+ finalNameToFileInfo.clear();
+ }
+ /**
+ * Aborts a single file. The temporary created file is deleted.
+ * @param p the path to final file as passed in create* call
+ * @throws IOException if File delete fails
+ */
+ public void abort(Path p)throws IOException{
+ abort(p, true);
+ }
+
+ // use extra method arg to avoid concurrentModificationException
+ // if aborting using this method while iterating.
+ private void abort(Path p, boolean removeFromMap) throws IOException{
+ FileInfo fInfo = finalNameToFileInfo.get(p);
+ if( null != fInfo ){
+ try{
+ fInfo.getOpenFileStream().close();
+ }catch(IOException ioe){
+ // ignore if already closed
+ }
+ baseFS.delete(fInfo.getTempPath());
+ if( removeFromMap ){
+ finalNameToFileInfo.remove(p);
+ }
+ }
+ }
+ /**
+ * Aborts the file creation, all uncommitted files created by this PhasedFileSystem
+ * instance are deleted. This does not close baseFS because handle to baseFS may still
+ * exist can be used to create new files.
+ *
+ * @throws IOException
+ */
+ public void abort() throws IOException {
+ for(Path fPath : finalNameToFileInfo.keySet() ){
+ abort(fPath, false);
+ }
+ // safe to clean now
+ finalNameToFileInfo.clear();
+ }
+ /**
+ * Closes base file system.
+ */
+ public void close() throws IOException {
+ baseFS.close();
+ }
+
+ @Override
+ public short getReplication(
+ Path src)
+ throws IOException {
+ // keep replication same for temp file as for
+ // final file.
+ return baseFS.getReplication(src);
+ }
+
+ @Override
+ public boolean setReplicationRaw(
+ Path src, short replication)
+ throws IOException {
+ // throw IOException for interface compatibility with
+ // base class.
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public boolean renameRaw(
+ Path src, Path dst)
+ throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public boolean deleteRaw(
+ Path f)
+ throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public boolean exists(Path f)
+ throws IOException {
+ return baseFS.exists(f);
+ }
+
+ @Override
+ public boolean isDirectory(Path f)
+ throws IOException {
+ return baseFS.isDirectory(f);
+ }
+
+ @Override
+ public long getLength(Path f)
+ throws IOException {
+ return baseFS.getLength(f);
+ }
+
+ @Override
+ public Path[] listPathsRaw(Path f)
+ throws IOException {
+ return baseFS.listPathsRaw(f);
+ }
+
+ @Override
+ public void setWorkingDirectory(Path new_dir) {
+ baseFS.setWorkingDirectory(new_dir);
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return baseFS.getWorkingDirectory();
+ }
+
+ @Override
+ public boolean mkdirs(Path f)
+ throws IOException {
+ return baseFS.mkdirs(f) ;
+ }
+
+ @Override
+ public void lock(
+ Path f, boolean shared)
+ throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public void release(
+ Path f)
+ throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public void copyFromLocalFile(
+ Path src, Path dst)
+ throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public void moveFromLocalFile(
+ Path src, Path dst)
+ throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public void copyToLocalFile(
+ Path src, Path dst)
+ throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public Path startLocalOutput(
+ Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public void completeLocalOutput(
+ Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public void reportChecksumFailure(
+ Path f, FSInputStream in, long start, long length, int crc) {
+ baseFS.reportChecksumFailure(f, in, start, length, crc);
+ }
+
+ @Override
+ public long getBlockSize(
+ Path f)
+ throws IOException {
+ return baseFS.getBlockSize(f);
+ }
+
+ @Override
+ public long getDefaultBlockSize() {
+ return baseFS.getDefaultBlockSize();
+ }
+
+ @Override
+ public short getDefaultReplication() {
+ return baseFS.getDefaultReplication();
+ }
+
+ @Override
+ public String[][] getFileCacheHints(
+ Path f, long start, long len)
+ throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public String getName() {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public FSInputStream openRaw(Path f)
+ throws IOException {
+ return baseFS.openRaw(f);
+ }
+
+ private class FileInfo {
+ private Path tempPath ;
+ private Path finalPath ;
+ private FSOutputStream openFileStream ;
+ private boolean overwrite ;
+
+ FileInfo(Path tempPath, Path finalPath, boolean overwrite){
+ this.tempPath = tempPath ;
+ this.finalPath = finalPath ;
+ this.overwrite = overwrite;
+ }
+ public FSOutputStream getOpenFileStream() {
+ return openFileStream;
+ }
+ public void setOpenFileStream(
+ FSOutputStream openFileStream) {
+ this.openFileStream = openFileStream;
+ }
+ public Path getFinalPath() {
+ return finalPath;
+ }
+ public void setFinalPath(
+ Path finalPath) {
+ this.finalPath = finalPath;
+ }
+ public boolean isOverwrite() {
+ return overwrite;
+ }
+ public void setOverwrite(
+ boolean overwrite) {
+ this.overwrite = overwrite;
+ }
+ public Path getTempPath() {
+ return tempPath;
+ }
+ public void setTempPath(
+ Path tempPath) {
+ this.tempPath = tempPath;
+ }
+
+ }
+
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 21 12:40:24 2006
@@ -80,9 +80,9 @@
public ReduceTask() {}
- public ReduceTask(String jobId, String jobFile, String taskId,
+ public ReduceTask(String jobId, String jobFile, String tipId, String taskId,
int partition, int numMaps) {
- super(jobId, jobFile, taskId, partition);
+ super(jobId, jobFile, tipId, taskId, partition);
this.numMaps = numMaps;
myMetrics = new ReduceTaskMetrics(taskId);
}
@@ -272,9 +272,18 @@
Reporter reporter = getReporter(umbilical, getProgress());
// make output collector
- String name = getOutputName(getPartition());
- final RecordWriter out =
- job.getOutputFormat().getRecordWriter(FileSystem.get(job), job, name, reporter);
+ String finalName = getOutputName(getPartition());
+ boolean runSpeculative = job.getSpeculativeExecution();
+ FileSystem fs = FileSystem.get(job) ;
+
+ if( runSpeculative ){
+ fs = new PhasedFileSystem (fs ,
+ getJobId(), getTipId(), getTaskId());
+ }
+
+ final RecordWriter out =
+ job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter) ;
+
OutputCollector collector = new OutputCollector() {
public void collect(WritableComparable key, Writable value)
throws IOException {
@@ -299,6 +308,9 @@
} finally {
reducer.close();
out.close(reporter);
+ if( runSpeculative ){
+ ((PhasedFileSystem)fs).commit();
+ }
}
done(umbilical);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Nov 21 12:40:24 2006
@@ -38,6 +38,7 @@
private String jobFile; // job configuration file
private String taskId; // unique, includes job id
private String jobId; // unique jobid
+ private String tipId ;
private int partition; // id within job
private TaskStatus.Phase phase ; // current phase of the task
@@ -47,10 +48,12 @@
public Task() {}
- public Task(String jobId, String jobFile, String taskId, int partition) {
+ public Task(String jobId, String jobFile, String tipId,
+ String taskId, int partition) {
this.jobFile = jobFile;
this.taskId = taskId;
this.jobId = jobId;
+ this.tipId = tipId;
this.partition = partition;
}
@@ -60,6 +63,7 @@
public void setJobFile(String jobFile) { this.jobFile = jobFile; }
public String getJobFile() { return jobFile; }
public String getTaskId() { return taskId; }
+ public String getTipId(){ return tipId ; }
/**
* Get the job name for this task.
@@ -97,12 +101,14 @@
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, jobFile);
+ UTF8.writeString(out, tipId);
UTF8.writeString(out, taskId);
UTF8.writeString(out, jobId);
out.writeInt(partition);
}
public void readFields(DataInput in) throws IOException {
jobFile = UTF8.readString(in);
+ tipId = UTF8.readString(in);
taskId = UTF8.readString(in);
jobId = UTF8.readString(in);
partition = in.readInt();
@@ -114,6 +120,7 @@
* Localize the given JobConf to be specific for this task.
*/
public void localizeConfiguration(JobConf conf) {
+ conf.set("mapred.tip.id", tipId);
conf.set("mapred.task.id", taskId);
conf.setBoolean("mapred.task.is.map",isMapTask());
conf.setInt("mapred.task.partition", partition);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Nov 21 12:40:24 2006
@@ -18,10 +18,8 @@
package org.apache.hadoop.mapred;
import org.apache.commons.logging.*;
-import org.apache.hadoop.util.*;
import java.text.NumberFormat;
-import java.io.*;
import java.util.*;
@@ -44,6 +42,7 @@
static final int MAX_TASK_FAILURES = 4;
static final double SPECULATIVE_GAP = 0.2;
static final long SPECULATIVE_LAG = 60 * 1000;
+ static final int MAX_CONCURRENT_TASKS = 2;
private static NumberFormat idFormat = NumberFormat.getInstance();
static {
idFormat.setMinimumIntegerDigits(6);
@@ -73,7 +72,9 @@
private boolean failed = false;
private boolean killed = false;
private TreeSet usableTaskIds = new TreeSet();
- private TreeSet recentTasks = new TreeSet();
+ // Map from task Id -> TaskTracker Id, contains tasks that are
+ // currently runnings
+ private TreeMap<String, String> activeTasks = new TreeMap();
private JobConf conf;
private boolean runSpeculative;
private Map<String,List<String>> taskDiagnosticData = new TreeMap();
@@ -176,7 +177,7 @@
* @return true if any tasks are running
*/
public boolean isRunning() {
- return !recentTasks.isEmpty();
+ return !activeTasks.isEmpty();
}
/**
@@ -326,8 +327,8 @@
status.setFinishTime(System.currentTimeMillis());
}
}
- this.recentTasks.remove(taskid);
- if (this.completes > 0) {
+ this.activeTasks.remove(taskid);
+ if (this.completes > 0 && this.isMapTask()) {
this.completes--;
}
@@ -347,7 +348,7 @@
LOG.info("Task '" + taskid + "' has completed.");
TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
status.setRunState(TaskStatus.State.SUCCEEDED);
- recentTasks.remove(taskid);
+ activeTasks.remove(taskid);
//
// Now that the TIP is complete, the other speculative
@@ -445,11 +446,19 @@
// in more depth eventually...
//
if (isMapTask() &&
- recentTasks.size() <= MAX_TASK_EXECS &&
+ activeTasks.size() <= MAX_TASK_EXECS &&
runSpeculative &&
(averageProgress - progress >= SPECULATIVE_GAP) &&
(System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
return true;
+ }else{
+ //Note: validate criteria for speculative reduce execution
+ if( runSpeculative && (activeTasks.size() < MAX_CONCURRENT_TASKS ) &&
+ (averageProgress - progress >= SPECULATIVE_GAP) &&
+ completes <= 0 &&
+ (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
+ return true ;
+ }
}
return false;
}
@@ -469,13 +478,13 @@
String jobId = job.getProfile().getJobId();
if (isMapTask()) {
- t = new MapTask(jobId, jobFile, taskid, partition, split);
+ t = new MapTask(jobId, jobFile, this.id, taskid, partition, split);
} else {
- t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps);
+ t = new ReduceTask(jobId, jobFile, this.id, taskid, partition, numMaps);
}
t.setConf(conf);
- recentTasks.add(taskid);
+ activeTasks.put(taskid, taskTracker);
// Ask JobTracker to note that the task exists
jobtracker.createTaskEntry(taskid, taskTracker, this);
@@ -491,6 +500,15 @@
return machinesWhereFailed.contains(tracker);
}
+ /**
+ * Was this task ever scheduled to run on this machine?
+ * @param tracker The task tracker name
+ * @return Was task scheduled on the tracker?
+ */
+ public boolean hasRunOnMachine(String tracker){
+ return this.activeTasks.values().contains(tracker) ||
+ hasFailedOnMachine(tracker) ;
+ }
/**
* Get the number of machines where this task has failed.
* @return the size of the failed machine set
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 21 12:40:24 2006
@@ -1044,12 +1044,33 @@
failures += 1;
}
runner.kill();
+ runstate = TaskStatus.State.KILLED;
} else if (runstate == TaskStatus.State.UNASSIGNED) {
if (wasFailure) {
failures += 1;
runstate = TaskStatus.State.FAILED;
} else {
runstate = TaskStatus.State.KILLED;
+ }
+ }
+
+ // the temporary file names in speculative exn are generated in
+ // the launched JVM, and we dont talk to it when killing so cleanup
+ // should happen here. find the task id and delete the temp directory
+ // for the task. only for killed speculative reduce instances
+
+ // Note: it would be better to couple this with delete localfiles
+ // which is in conf currently, it doenst belong there.
+
+ if( !task.isMapTask() &&
+ this.defaultJobConf.getSpeculativeExecution() ){
+ try{
+ String systemDir = task.getConf().get("mapred.system.dir");
+ String taskTempDir = systemDir + "/" +
+ task.getJobId() + "/" + task.getTipId();
+ fs.delete(new Path(taskTempDir)) ;
+ }catch(IOException e){
+ LOG.warn("Error in deleting reduce temporary output",e);
}
}
}