You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC

svn commit: r885145 [25/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java Sat Nov 28 20:26:01 2009
@@ -19,89 +19,246 @@
 package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
+import java.net.URI;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 
 /**
  * A read-only view of the job that is provided to the tasks while they
  * are running.
  */
-public class JobContext {
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface JobContext {
   // Put all of the attribute names in here so that Job and JobContext are
   // consistent.
-  protected static final String INPUT_FORMAT_CLASS_ATTR = 
-    "mapreduce.inputformat.class";
-  protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";
-  protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
-  protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
-  protected static final String OUTPUT_FORMAT_CLASS_ATTR = 
-    "mapreduce.outputformat.class";
-  protected static final String PARTITIONER_CLASS_ATTR = 
-    "mapreduce.partitioner.class";
-
-  protected final org.apache.hadoop.mapred.JobConf conf;
-  private final JobID jobId;
-  
-  public JobContext(Configuration conf, JobID jobId) {
-    this.conf = new org.apache.hadoop.mapred.JobConf(conf);
-    this.jobId = jobId;
-  }
+  public static final String INPUT_FORMAT_CLASS_ATTR = 
+    "mapreduce.job.inputformat.class";
+  public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";
+  public static final String COMBINE_CLASS_ATTR = 
+    "mapreduce.job.combine.class";
+  public static final String REDUCE_CLASS_ATTR = 
+    "mapreduce.job.reduce.class";
+  public static final String OUTPUT_FORMAT_CLASS_ATTR = 
+    "mapreduce.job.outputformat.class";
+  public static final String PARTITIONER_CLASS_ATTR = 
+    "mapreduce.job.partitioner.class";
+
+  public static final String SETUP_CLEANUP_NEEDED = 
+    "mapreduce.job.committer.setup.cleanup.needed";
+  public static final String JAR = "mapreduce.job.jar";
+  public static final String ID = "mapreduce.job.id";
+  public static final String JOB_NAME = "mapreduce.job.name";
+  public static final String USER_NAME = "mapreduce.job.user.name";
+  public static final String PRIORITY = "mapreduce.job.priority";
+  public static final String QUEUE_NAME = "mapreduce.job.queuename";
+  public static final String JVM_NUMTASKS_TORUN = 
+    "mapreduce.job.jvm.numtasks";
+  public static final String SPLIT_FILE = "mapreduce.job.splitfile";
+  public static final String NUM_MAPS = "mapreduce.job.maps";
+  public static final String MAX_TASK_FAILURES_PER_TRACKER = 
+    "mapreduce.job.maxtaskfailures.per.tracker";
+  public static final String COMPLETED_MAPS_FOR_REDUCE_SLOWSTART =
+    "mapreduce.job.reduce.slowstart.completedmaps";
+  public static final String NUM_REDUCES = "mapreduce.job.reduces";
+  public static final String SKIP_RECORDS = "mapreduce.job.skiprecords";
+  public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
+  public static final String SPECULATIVE_SLOWNODE_THRESHOLD =
+    "mapreduce.job.speculative.slownodethreshold";
+  public static final String SPECULATIVE_SLOWTASK_THRESHOLD = 
+    "mapreduce.job.speculative.slowtaskthreshold";
+  public static final String SPECULATIVECAP = 
+    "mapreduce.job.speculative.speculativecap";
+  public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";
+  public static final String OUTPUT_KEY_CLASS = 
+    "mapreduce.job.output.key.class";
+  public static final String OUTPUT_VALUE_CLASS = 
+    "mapreduce.job.output.value.class";
+  public static final String KEY_COMPARATOR = 
+    "mapreduce.job.output.key.comparator.class";
+  public static final String GROUP_COMPARATOR_CLASS = 
+    "mapreduce.job.output.group.comparator.class";
+  public static final String WORKING_DIR = "mapreduce.job.working.dir";
+  public static final String HISTORY_LOCATION = 
+    "mapreduce.job.userhistorylocation"; 
+  public static final String END_NOTIFICATION_URL = 
+    "mapreduce.job.end-notification.url";
+  public static final String END_NOTIFICATION_RETRIES = 
+    "mapreduce.job.end-notification.retry.attempts";
+  public static final String END_NOTIFICATION_RETRIE_INTERVAL = 
+    "mapreduce.job.end-notification.retry.interval";
+  public static final String CLASSPATH_ARCHIVES = 
+    "mapreduce.job.classpath.archives";
+  public static final String CLASSPATH_FILES = "mapreduce.job.classpath.files";
+  public static final String CACHE_FILES = "mapreduce.job.cache.files";
+  public static final String CACHE_ARCHIVES = "mapreduce.job.cache.archives";
+  public static final String CACHE_LOCALFILES = 
+    "mapreduce.job.cache.local.files";
+  public static final String CACHE_LOCALARCHIVES = 
+    "mapreduce.job.cache.local.archives";
+  public static final String CACHE_FILE_TIMESTAMPS = 
+    "mapreduce.job.cache.files.timestamps";
+  public static final String CACHE_ARCHIVES_TIMESTAMPS = 
+    "mapreduce.job.cache.archives.timestamps";
+  public static final String CACHE_SYMLINK = 
+    "mapreduce.job.cache.symlink.create";
+  
+  public static final String IO_SORT_FACTOR = 
+    "mapreduce.task.io.sort.factor"; 
+  public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";
+  public static final String PRESERVE_FAILED_TASK_FILES = 
+    "mapreduce.task.files.preserve.failedtasks";
+  public static final String PRESERVE_FILES_PATTERN = 
+    "mapreduce.task.files.preserve.filepattern";
+  public static final String TASK_TEMP_DIR = "mapreduce.task.tmp.dir";
+  public static final String TASK_DEBUGOUT_LINES = 
+    "mapreduce.task.debugout.lines";
+  public static final String RECORDS_BEFORE_PROGRESS = 
+    "mapreduce.task.merge.progress.records";
+  public static final String SKIP_START_ATTEMPTS = 
+    "mapreduce.task.skip.start.attempts";
+  public static final String TASK_ATTEMPT_ID = "mapreduce.task.attempt.id";
+  public static final String TASK_ISMAP = "mapreduce.task.ismap";
+  public static final String TASK_PARTITION = "mapreduce.task.partition";
+  public static final String TASK_PROFILE = "mapreduce.task.profile";
+  public static final String TASK_PROFILE_PARAMS = 
+    "mapreduce.task.profile.params";
+  public static final String NUM_MAP_PROFILES = 
+    "mapreduce.task.profile.maps";
+  public static final String NUM_REDUCE_PROFILES = 
+    "mapreduce.task.profile.reduces";
+  public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
+  public static final String TASK_ID = "mapreduce.task.id";
+  public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
+  public static final String TASK_USERLOG_LIMIT = 
+    "mapreduce.task.userlog.limit.kb";
+  public static final String TASK_LOG_RETAIN_HOURS = 
+    "mapred.task.userlog.retain.hours";
+  
+  public static final String MAP_SORT_RECORD_PERCENT = 
+    "mapreduce.map.sort.record.percent";
+  public static final String MAP_SORT_SPILL_PERCENT =
+    "mapreduce.map.sort.spill.percent";
+  public static final String MAP_INPUT_FILE = "mapreduce.map.input.file";
+  public static final String MAP_INPUT_PATH = "mapreduce.map.input.length";
+  public static final String MAP_INPUT_START = "mapreduce.map.input.start";
+  public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
+  public static final String MAP_ENV = "mapreduce.map.env";
+  public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
+  public static final String MAP_ULIMIT = "mapreduce.map.ulimit"; 
+  public static final String MAP_MAX_ATTEMPTS = "mapreduce.map.maxattempts";
+  public static final String MAP_DEBUG_SCRIPT = 
+    "mapreduce.map.debug.script";
+  public static final String MAP_SPECULATIVE = "mapreduce.map.speculative";
+  public static final String MAP_FAILURES_MAX_PERCENT = 
+    "mapreduce.map.failures.maxpercent";
+  public static final String MAP_SKIP_INCR_PROC_COUNT = 
+    "mapreduce.map.skip.proc-count.auto-incr";
+  public static final String MAP_SKIP_MAX_RECORDS = 
+    "mapreduce.map.skip.maxrecords";
+  public static final String MAP_COMBINE_MIN_SPISS = 
+    "mapreduce.map.combine.minspills";
+  public static final String MAP_OUTPUT_COMPRESS = 
+    "mapreduce.map.output.compress";
+  public static final String MAP_OUTPUT_COMPRESS_CODEC = 
+    "mapreduce.map.output.compress.codec";
+  public static final String MAP_OUTPUT_KEY_CLASS = 
+    "mapreduce.map.output.key.class";
+  public static final String MAP_OUTPUT_VALUE_CLASS = 
+    "mapreduce.map.output.value.class";
+  public static final String MAP_OUTPUT_KEY_FIELD_SEPERATOR = 
+    "mapreduce.map.output.key.field.separator";
+  public static final String MAP_LOG_LEVEL = "mapreduce.map.log.level";
+ 
+  public static final String REDUCE_LOG_LEVEL = 
+    "mapreduce.reduce.log.level";
+  public static final String REDUCE_MERGE_INMEM_THRESHOLD = 
+    "mapreduce.reduce.merge.inmem.threshold";
+  public static final String REDUCE_INPUT_BUFFER_PERCENT = 
+    "mapreduce.reduce.input.buffer.percent";
+  public static final String REDUCE_MARKRESET_BUFFER_PERCENT = 
+    "mapreduce.reduce.markreset.buffer.percent";
+  public static final String REDUCE_MARKRESET_BUFFER_SIZE = 
+    "mapreduce.reduce.markreset.buffer.size";
+  public static final String REDUCE_MEMORY_MB = 
+    "mapreduce.reduce.memory.mb";
+  public static final String REDUCE_MEMORY_TOTAL_BYTES = 
+    "mapreduce.reduce.memory.totalbytes";
+  public static final String SHUFFLE_INPUT_BUFFER_PERCENT = 
+    "mapreduce.reduce.shuffle.input.buffer.percent";
+  public static final String SHUFFLE_MERGE_EPRCENT = 
+    "mapreduce.reduce.shuffle.merge.percent";
+  public static final String REDUCE_FAILURES_MAXPERCENT = 
+   "mapreduce.reduce.failures.maxpercent";
+  public static final String REDUCE_ENV = "mapreduce.reduce.env";
+  public static final String REDUCE_JAVA_OPTS = 
+    "mapreduce.reduce.java.opts";
+  public static final String REDUCE_ULIMIT = "mapreduce.reduce.ulimit"; 
+  public static final String REDUCE_MAX_ATTEMPTS = 
+    "mapreduce.reduce.maxattempts";
+  public static final String SHUFFLE_PARALLEL_COPIES = 
+    "mapreduce.reduce.shuffle.parallelcopies";
+  public static final String REDUCE_DEBUG_SCRIPT = 
+    "mapreduce.reduce.debug.script";
+  public static final String REDUCE_SPECULATIVE = 
+    "mapreduce.reduce.speculative";
+  public static final String SHUFFLE_CONNECT_TIMEOUT = 
+    "mapreduce.reduce.shuffle.connect.timeout";
+  public static final String SHUFFLE_READ_TIMEOUT = 
+    "mapreduce.reduce.shuffle.read.timeout";
+  public static final String REDUCE_SKIP_INCR_PROC_COUNT = 
+    "mapreduce.reduce.skip.proc-count.auto-incr";
+  public static final String REDUCE_SKIP_MAXGROUPS = 
+    "mapreduce.reduce.skip.maxgroups";
+  public static final String REDUCE_MEMTOMEM_THRESHOLD = 
+    "mapreduce.reduce.merge.memtomem.threshold";
+  public static final String REDUCE_MEMTOMEM_ENABLED = 
+    "mapreduce.reduce.merge.memtomem.enabled";
+  public static final String JOB_TOKEN_FILE = "mapreduce.job.jobTokenFile";
 
   /**
    * Return the configuration for the job.
    * @return the shared configuration object
    */
-  public Configuration getConfiguration() {
-    return conf;
-  }
+  public Configuration getConfiguration();
 
   /**
    * Get the unique ID for the job.
    * @return the object with the job id
    */
-  public JobID getJobID() {
-    return jobId;
-  }
+  public JobID getJobID();
   
   /**
    * Get configured the number of reduce tasks for this job. Defaults to 
    * <code>1</code>.
    * @return the number of reduce tasks for this job.
    */
-  public int getNumReduceTasks() {
-    return conf.getNumReduceTasks();
-  }
+  public int getNumReduceTasks();
   
   /**
    * Get the current working directory for the default file system.
    * 
    * @return the directory name.
    */
-  public Path getWorkingDirectory() throws IOException {
-    return conf.getWorkingDirectory();
-  }
+  public Path getWorkingDirectory() throws IOException;
 
   /**
    * Get the key class for the job output data.
    * @return the key class for the job output data.
    */
-  public Class<?> getOutputKeyClass() {
-    return conf.getOutputKeyClass();
-  }
+  public Class<?> getOutputKeyClass();
   
   /**
    * Get the value class for job outputs.
    * @return the value class for job outputs.
    */
-  public Class<?> getOutputValueClass() {
-    return conf.getOutputValueClass();
-  }
+  public Class<?> getOutputValueClass();
 
   /**
    * Get the key class for the map output data. If it is not set, use the
@@ -109,9 +266,7 @@
    * different than the final output key class.
    * @return the map output key class.
    */
-  public Class<?> getMapOutputKeyClass() {
-    return conf.getMapOutputKeyClass();
-  }
+  public Class<?> getMapOutputKeyClass();
 
   /**
    * Get the value class for the map output data. If it is not set, use the
@@ -120,9 +275,7 @@
    *  
    * @return the map output value class.
    */
-  public Class<?> getMapOutputValueClass() {
-    return conf.getMapOutputValueClass();
-  }
+  public Class<?> getMapOutputValueClass();
 
   /**
    * Get the user-specified job name. This is only used to identify the 
@@ -130,98 +283,68 @@
    * 
    * @return the job's name, defaulting to "".
    */
-  public String getJobName() {
-    return conf.getJobName();
-  }
+  public String getJobName();
 
   /**
    * Get the {@link InputFormat} class for the job.
    * 
    * @return the {@link InputFormat} class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends InputFormat<?,?>> getInputFormatClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends InputFormat<?,?>>) 
-      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the {@link Mapper} class for the job.
    * 
    * @return the {@link Mapper} class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends Mapper<?,?,?,?>>) 
-      conf.getClass(MAP_CLASS_ATTR, Mapper.class);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the combiner class for the job.
    * 
    * @return the combiner class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends Reducer<?,?,?,?>>) 
-      conf.getClass(COMBINE_CLASS_ATTR, null);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the {@link Reducer} class for the job.
    * 
    * @return the {@link Reducer} class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends Reducer<?,?,?,?>> getReducerClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends Reducer<?,?,?,?>>) 
-      conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the {@link OutputFormat} class for the job.
    * 
    * @return the {@link OutputFormat} class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends OutputFormat<?,?>>) 
-      conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the {@link Partitioner} class for the job.
    * 
    * @return the {@link Partitioner} class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends Partitioner<?,?>> getPartitionerClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends Partitioner<?,?>>) 
-      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the {@link RawComparator} comparator used to compare keys.
    * 
    * @return the {@link RawComparator} comparator used to compare keys.
    */
-  public RawComparator<?> getSortComparator() {
-    return conf.getOutputKeyComparator();
-  }
+  public RawComparator<?> getSortComparator();
 
   /**
    * Get the pathname of the job's jar.
    * @return the pathname
    */
-  public String getJar() {
-    return conf.getJar();
-  }
+  public String getJar();
 
   /** 
    * Get the user defined {@link RawComparator} comparator for 
@@ -230,18 +353,123 @@
    * @return comparator set by the user for grouping values.
    * @see Job#setGroupingComparatorClass(Class) for details.  
    */
-  public RawComparator<?> getGroupingComparator() {
-    return conf.getOutputValueGroupingComparator();
-  }
+  public RawComparator<?> getGroupingComparator();
   
   /**
    * Get whether job-setup and job-cleanup is needed for the job 
    * 
    * @return boolean 
    */
-  public boolean getJobSetupCleanupNeeded() {
-    return conf.getBoolean("mapred.committer.job.setup.cleanup.needed", true);
-  }
+  public boolean getJobSetupCleanupNeeded();
+
+  /**
+   * Get whether the task profiling is enabled.
+   * @return true if some tasks will be profiled
+   */
+  public boolean getProfileEnabled();
+
+  /**
+   * Get the profiler configuration arguments.
+   *
+   * The default value for this property is
+   * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
+   * 
+   * @return the parameters to pass to the task child to configure profiling
+   */
+  public String getProfileParams();
+
+  /**
+   * Get the range of maps or reduces to profile.
+   * @param isMap is the task a map?
+   * @return the task ranges
+   */
+  public IntegerRanges getProfileTaskRange(boolean isMap);
+
+  /**
+   * Get the reported username for this job.
+   * 
+   * @return the username
+   */
+  public String getUser();
+  
+  /**
+   * This method checks to see if symlinks are to be create for the 
+   * localized cache files in the current working directory 
+   * @return true if symlinks are to be created- else return false
+   */
+  public boolean getSymlink();
+  
+  /**
+   * Get the archive entries in classpath as an array of Path
+   */
+  public Path[] getArchiveClassPaths();
+
+  /**
+   * Get cache archives set in the Configuration
+   * @return A URI array of the caches set in the Configuration
+   * @throws IOException
+   */
+  public URI[] getCacheArchives() throws IOException;
 
+  /**
+   * Get cache files set in the Configuration
+   * @return A URI array of the files set in the Configuration
+   * @throws IOException
+   */
+
+  public URI[] getCacheFiles() throws IOException;
+
+  /**
+   * Return the path array of the localized caches
+   * @return A path array of localized caches
+   * @throws IOException
+   */
+  public Path[] getLocalCacheArchives() throws IOException;
+
+  /**
+   * Return the path array of the localized files
+   * @return A path array of localized files
+   * @throws IOException
+   */
+  public Path[] getLocalCacheFiles() throws IOException;
+
+  /**
+   * Get the file entries in classpath as an array of Path
+   */
+  public Path[] getFileClassPaths();
+  
+  /**
+   * Get the timestamps of the archives.  Used by internal
+   * DistributedCache and MapReduce code.
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public String[] getArchiveTimestamps();
+
+  /**
+   * Get the timestamps of the files.  Used by internal
+   * DistributedCache and MapReduce code.
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public String[] getFileTimestamps();
+
+  /** 
+   * Get the configured number of maximum attempts that will be made to run a
+   * map task, as specified by the <code>mapred.map.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   *  
+   * @return the max number of attempts per map task.
+   */
+  public int getMaxMapAttempts();
+
+  /** 
+   * Get the configured number of maximum attempts  that will be made to run a
+   * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   * 
+   * @return the max number of attempts per reduce task.
+   */
+  public int getMaxReduceAttempts();
 
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/MapContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/MapContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/MapContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/MapContext.java Sat Nov 28 20:26:01 2009
@@ -18,9 +18,8 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * The context that is given to the {@link Mapper}.
@@ -29,43 +28,15 @@
  * @param <KEYOUT> the key output type from the Mapper
  * @param <VALUEOUT> the value output type from the Mapper
  */
-public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
   extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
-  private RecordReader<KEYIN,VALUEIN> reader;
-  private InputSplit split;
-
-  public MapContext(Configuration conf, TaskAttemptID taskid,
-                    RecordReader<KEYIN,VALUEIN> reader,
-                    RecordWriter<KEYOUT,VALUEOUT> writer,
-                    OutputCommitter committer,
-                    StatusReporter reporter,
-                    InputSplit split) {
-    super(conf, taskid, writer, committer, reporter);
-    this.reader = reader;
-    this.split = split;
-  }
 
   /**
    * Get the input split for this map.
    */
-  public InputSplit getInputSplit() {
-    return split;
-  }
-
-  @Override
-  public KEYIN getCurrentKey() throws IOException, InterruptedException {
-    return reader.getCurrentKey();
-  }
-
-  @Override
-  public VALUEIN getCurrentValue() throws IOException, InterruptedException {
-    return reader.getCurrentValue();
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    return reader.nextKeyValue();
-  }
-
+  public InputSplit getInputSplit();
+  
 }
      
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Mapper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Mapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Mapper.java Sat Nov 28 20:26:01 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 
 /** 
  * Maps input key/value pairs to a set of intermediate key/value pairs.  
@@ -94,16 +95,11 @@
  */
 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 
-  public class Context 
-    extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
-    public Context(Configuration conf, TaskAttemptID taskid,
-                   RecordReader<KEYIN,VALUEIN> reader,
-                   RecordWriter<KEYOUT,VALUEOUT> writer,
-                   OutputCommitter committer,
-                   StatusReporter reporter,
-                   InputSplit split) throws IOException, InterruptedException {
-      super(conf, taskid, reader, writer, committer, reporter, split);
-    }
+  /**
+   * The <code>Context</code> passed on to the {@link Mapper} implementations.
+   */
+  public abstract class Context
+    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
   }
   
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java Sat Nov 28 20:26:01 2009
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
-
 /**
  * <code>OutputCommitter</code> describes the commit of task output for a 
  * Map-Reduce job.
@@ -69,9 +68,38 @@
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
+   * @deprecated Use {@link #commitJob(JobContext)} or
+   *                 {@link #abortJob(JobContext, JobStatus.State)} instead.
+   */
+  @Deprecated
+  public void cleanupJob(JobContext jobContext) throws IOException { }
+
+  /**
+   * For committing job's output after successful job completion. Note that this
+   * is invoked for jobs with final runstate as SUCCESSFUL.	
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException
    */
-  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+  public void commitJob(JobContext jobContext) throws IOException {
+    cleanupJob(jobContext);
+  }
 
+  
+  /**
+   * For aborting an unsuccessful job's output. Note that this is invoked for 
+   * jobs with final runstate as {@link JobStatus.State#FAILED} or 
+   * {@link JobStatus.State#KILLED}.
+   *
+   * @param jobContext Context of the job whose output is being written.
+   * @param state final runstate of the job
+   * @throws IOException
+   */
+  public void abortJob(JobContext jobContext, JobStatus.State state) 
+  throws IOException {
+    cleanupJob(jobContext);
+  }
+  
   /**
    * Sets up output for the task.
    * 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/ReduceContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/ReduceContext.java Sat Nov 28 20:26:01 2009
@@ -18,22 +18,11 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.NoSuchElementException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.BackupStore;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * The context passed to the {@link Reducer}.
@@ -42,321 +31,32 @@
  * @param <KEYOUT> the class of the output keys
  * @param <VALUEOUT> the class of the output values
  */
-public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
-  private RawKeyValueIterator input;
-  private Counter inputCounter;
-  private RawComparator<KEYIN> comparator;
-  private KEYIN key;                                  // current key
-  private VALUEIN value;                              // current value
-  private boolean firstValue = false;                 // first value in key
-  private boolean nextKeyIsSame = false;              // more w/ this key
-  private boolean hasMore;                            // more in file
-  protected Progressable reporter;
-  private Deserializer<KEYIN> keyDeserializer;
-  private Deserializer<VALUEIN> valueDeserializer;
-  private DataInputBuffer buffer = new DataInputBuffer();
-  private BytesWritable currentRawKey = new BytesWritable();
-  private ValueIterable iterable = new ValueIterable();
-  private boolean isMarked = false;
-  private BackupStore<KEYIN,VALUEIN> backupStore;
-  private final SerializationFactory serializationFactory;
-  private final Class<KEYIN> keyClass;
-  private final Class<VALUEIN> valueClass;
-  private final Configuration conf;
-  private final TaskAttemptID taskid;
-  private int currentKeyLength = -1;
-  private int currentValueLength = -1;
-  
-  public ReduceContext(Configuration conf, TaskAttemptID taskid,
-                       RawKeyValueIterator input, 
-                       Counter inputCounter,
-                       RecordWriter<KEYOUT,VALUEOUT> output,
-                       OutputCommitter committer,
-                       StatusReporter reporter,
-                       RawComparator<KEYIN> comparator,
-                       Class<KEYIN> keyClass,
-                       Class<VALUEIN> valueClass
-                       ) throws InterruptedException, IOException{
-    super(conf, taskid, output, committer, reporter);
-    this.input = input;
-    this.inputCounter = inputCounter;
-    this.comparator = comparator;
-    this.serializationFactory = new SerializationFactory(conf);
-    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
-    this.keyDeserializer.open(buffer);
-    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
-    this.valueDeserializer.open(buffer);
-    hasMore = input.next();
-    this.keyClass = keyClass;
-    this.valueClass = valueClass;
-    this.conf = conf;
-    this.taskid = taskid;
-  }
 
-  public RawKeyValueIterator getIterator() {
-    return input;
-  }
-  
-  public Counter getInputCounter() {
-    return inputCounter;
-  }
-  
-  public RawComparator<KEYIN> getComparator() {
-    return comparator;
-  }
-  
-  public boolean hasMore() {
-    return hasMore;
-  }
-  
   /** Start processing next unique key. */
-  public boolean nextKey() throws IOException,InterruptedException {
-    while (hasMore && nextKeyIsSame) {
-      nextKeyValue();
-    }
-    if (hasMore) {
-      return nextKeyValue();
-    } else {
-      return false;
-    }
-  }
+  public boolean nextKey() throws IOException,InterruptedException;
 
   /**
-   * Advance to the next key/value pair.
+   * Iterate through the values for the current key, reusing the same value 
+   * object, which is stored in the context.
+   * @return the series of values associated with the current key. All of the 
+   * objects returned directly and indirectly from this method are reused.
    */
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (!hasMore) {
-      key = null;
-      value = null;
-      return false;
-    }
-    firstValue = !nextKeyIsSame;
-    DataInputBuffer nextKey = input.getKey();
-    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
-                      nextKey.getLength() - nextKey.getPosition());
-    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
-    key = keyDeserializer.deserialize(key);
-    DataInputBuffer nextVal = input.getValue();
-    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
-    value = valueDeserializer.deserialize(value);
-
-    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
-    currentValueLength = nextVal.getLength() - nextVal.getPosition();
-
-    if (isMarked) {
-      backupStore.write(nextKey, nextVal);
-    }
-
-    hasMore = input.next();
-    inputCounter.increment(1);
-    if (hasMore) {
-      nextKey = input.getKey();
-      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
-                                     currentRawKey.getLength(),
-                                     nextKey.getData(),
-                                     nextKey.getPosition(),
-                                     nextKey.getLength() - nextKey.getPosition()
-                                         ) == 0;
-    } else {
-      nextKeyIsSame = false;
-    }
-    return true;
-  }
-
-  public KEYIN getCurrentKey() {
-    return key;
-  }
-
-  @Override
-  public VALUEIN getCurrentValue() {
-    return value;
-  }
-  
-
-  
-  protected class ValueIterator implements MarkableIteratorInterface<VALUEIN> {
-
-    private boolean inReset = false;
-    private boolean clearMarkFlag = false;
-    
-    @Override
-    public boolean hasNext() {
-      try {
-        if (inReset && backupStore.hasNext()) {
-          return true;
-        } 
-      } catch (Exception e) {
-        e.printStackTrace();
-        throw new RuntimeException("hasNext failed", e);
-      }
-      return firstValue || nextKeyIsSame;
-    }
+  public Iterable<VALUEIN> getValues() throws IOException, InterruptedException;
 
-    @Override
-    public VALUEIN next() {
-      if (inReset) {
-        try {
-          if (backupStore.hasNext()) {
-            backupStore.next();
-            DataInputBuffer next = backupStore.nextValue();
-            buffer.reset(next.getData(), next.getPosition(), next.getLength());
-            value = valueDeserializer.deserialize(value);
-            return value;
-          } else {
-            inReset = false;
-            backupStore.exitResetMode();
-            if (clearMarkFlag) {
-              clearMarkFlag = false;
-              isMarked = false;
-            }
-          }
-        } catch (IOException e) {
-          e.printStackTrace();
-          throw new RuntimeException("next value iterator failed", e);
-        }
-      } 
-
-      // if this is the first record, we don't need to advance
-      if (firstValue) {
-        firstValue = false;
-        return value;
-      }
-      // if this isn't the first record and the next key is different, they
-      // can't advance it here.
-      if (!nextKeyIsSame) {
-        throw new NoSuchElementException("iterate past last value");
-      }
-      // otherwise, go to the next key/value pair
-      try {
-        nextKeyValue();
-        return value;
-      } catch (IOException ie) {
-        throw new RuntimeException("next value iterator failed", ie);
-      } catch (InterruptedException ie) {
-        // this is bad, but we can't modify the exception list of java.util
-        throw new RuntimeException("next value iterator interrupted", ie);        
-      }
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("remove not implemented");
-    }
-
-    @Override
-    public void mark() throws IOException {
-      if (backupStore == null) {
-        backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
-      }
-      isMarked = true;
-      if (!inReset) {
-        backupStore.reinitialize();
-        if (currentKeyLength == -1) {
-          // The user has not called next() for this iterator yet, so
-          // there is no current record to mark and copy to backup store.
-          return;
-        }
-        assert (currentValueLength != -1);
-        int requestedSize = currentKeyLength + currentValueLength + 
-          WritableUtils.getVIntSize(currentKeyLength) +
-          WritableUtils.getVIntSize(currentValueLength);
-        DataOutputStream out = backupStore.getOutputStream(requestedSize);
-        writeFirstKeyValueBytes(out);
-        backupStore.updateCounters(requestedSize);
-      } else {
-        backupStore.mark();
-      }
-    }
-
-    @Override
-    public void reset() throws IOException {
-      // We reached the end of an iteration and user calls a 
-      // reset, but a clearMark was called before, just throw
-      // an exception
-      if (clearMarkFlag) {
-        clearMarkFlag = false;
-        backupStore.clearMark();
-        throw new IOException("Reset called without a previous mark");
-      }
-      
-      if (!isMarked) {
-        throw new IOException("Reset called without a previous mark");
-      }
-      inReset = true;
-      backupStore.reset();
-    }
+  /**
+   * {@link Iterator} to iterate over values for a given group of records.
+   */
+  interface ValueIterator<VALUEIN> extends MarkableIteratorInterface<VALUEIN> {
 
-    @Override
-    public void clearMark() throws IOException {
-      if (backupStore == null) {
-        return;
-      }
-      if (inReset) {
-        clearMarkFlag = true;
-        backupStore.clearMark();
-      } else {
-        inReset = isMarked = false;
-        backupStore.reinitialize();
-      }
-    }
-	  
     /**
      * This method is called when the reducer moves from one key to 
      * another.
      * @throws IOException
      */
-    void resetBackupStore() throws IOException {
-      if (backupStore == null) {
-        return;
-      }
-      inReset = isMarked = false;
-      backupStore.reinitialize();
-      currentKeyLength = -1;
-    }
-
-    /**
-     * This method is called to write the record that was most recently
-     * served (before a call to the mark). Since the framework reads one
-     * record in advance, to get this record, we serialize the current key
-     * and value
-     * @param out
-     * @throws IOException
-     */
-    private void writeFirstKeyValueBytes(DataOutputStream out) 
-    throws IOException {
-      assert (getCurrentKey() != null && getCurrentValue() != null);
-      WritableUtils.writeVInt(out, currentKeyLength);
-      WritableUtils.writeVInt(out, currentValueLength);
-      Serializer<KEYIN> keySerializer = 
-        serializationFactory.getSerializer(keyClass);
-      keySerializer.open(out);
-      keySerializer.serialize(getCurrentKey());
-
-      Serializer<VALUEIN> valueSerializer = 
-        serializationFactory.getSerializer(valueClass);
-      valueSerializer.open(out);
-      valueSerializer.serialize(getCurrentValue());
-    }
-  }
-
-  protected class ValueIterable implements Iterable<VALUEIN> {
-    private ValueIterator iterator = new ValueIterator();
-    @Override
-    public Iterator<VALUEIN> iterator() {
-      return iterator;
-    } 
-  }
-  
-  /**
-   * Iterate through the values for the current key, reusing the same value 
-   * object, which is stored in the context.
-   * @return the series of values associated with the current key. All of the 
-   * objects returned directly and indirectly from this method are reused.
-   */
-  public 
-  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
-    return iterable;
+    void resetBackupStore() throws IOException;
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Reducer.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Reducer.java Sat Nov 28 20:26:01 2009
@@ -117,21 +117,11 @@
  */
 public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 
-  public class Context 
-    extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
-    public Context(Configuration conf, TaskAttemptID taskid,
-                   RawKeyValueIterator input, 
-                   Counter inputCounter,
-                   RecordWriter<KEYOUT,VALUEOUT> output,
-                   OutputCommitter committer,
-                   StatusReporter reporter,
-                   RawComparator<KEYIN> comparator,
-                   Class<KEYIN> keyClass,
-                   Class<VALUEIN> valueClass
-                   ) throws IOException, InterruptedException {
-      super(conf, taskid, input, inputCounter, output, committer, reporter, 
-            comparator, keyClass, valueClass);
-    }
+  /**
+   * The <code>Context</code> passed on to the {@link Reducer} implementations.
+   */
+  public abstract class Context 
+    implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
   }
 
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java Sat Nov 28 20:26:01 2009
@@ -18,49 +18,31 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.util.Progressable;
 
 /**
  * The context for task attempts.
  */
-public class TaskAttemptContext extends JobContext implements Progressable {
-  private final TaskAttemptID taskId;
-  private String status = "";
-  
-  public TaskAttemptContext(Configuration conf, 
-                            TaskAttemptID taskId) {
-    super(conf, taskId.getJobID());
-    this.taskId = taskId;
-  }
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface TaskAttemptContext extends JobContext, Progressable {
 
   /**
    * Get the unique name for this task attempt.
    */
-  public TaskAttemptID getTaskAttemptID() {
-    return taskId;
-  }
+  public TaskAttemptID getTaskAttemptID();
 
   /**
    * Set the current status of the task to the given string.
    */
-  public void setStatus(String msg) throws IOException {
-    status = msg;
-  }
+  public void setStatus(String msg);
 
   /**
    * Get the last set status message.
    * @return the current status message
    */
-  public String getStatus() {
-    return status;
-  }
+  public String getStatus();
 
-  /**
-   * Report progress. The subtypes actually do work in this method.
-   */
-  public void progress() { 
-  }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java Sat Nov 28 20:26:01 2009
@@ -20,8 +20,8 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * A context object that allows input and output from the task. It is only
@@ -31,28 +31,16 @@
  * @param <KEYOUT> the output key type for the task
  * @param <VALUEOUT> the output value type for the task
  */
-public abstract class TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
-       extends TaskAttemptContext implements Progressable {
-  private RecordWriter<KEYOUT,VALUEOUT> output;
-  private StatusReporter reporter;
-  private OutputCommitter committer;
-
-  public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid,
-                                RecordWriter<KEYOUT,VALUEOUT> output,
-                                OutputCommitter committer,
-                                StatusReporter reporter) {
-    super(conf, taskid);
-    this.output = output;
-    this.reporter = reporter;
-    this.committer = committer;
-  }
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+       extends TaskAttemptContext {
 
   /**
    * Advance to the next key, value pair, returning null if at end.
    * @return the key object that was read into, or null if no more
    */
-  public abstract 
-  boolean nextKeyValue() throws IOException, InterruptedException;
+  public boolean nextKeyValue() throws IOException, InterruptedException;
  
   /**
    * Get the current key.
@@ -60,8 +48,7 @@
    * @throws IOException
    * @throws InterruptedException
    */
-  public abstract 
-  KEYIN getCurrentKey() throws IOException, InterruptedException;
+  public KEYIN getCurrentKey() throws IOException, InterruptedException;
 
   /**
    * Get the current value.
@@ -69,36 +56,33 @@
    * @throws IOException
    * @throws InterruptedException
    */
-  public abstract VALUEIN getCurrentValue() throws IOException, 
-                                                   InterruptedException;
+  public VALUEIN getCurrentValue() throws IOException, InterruptedException;
 
   /**
    * Generate an output key/value pair.
    */
-  public void write(KEYOUT key, VALUEOUT value
-                    ) throws IOException, InterruptedException {
-    output.write(key, value);
-  }
-
-  public Counter getCounter(Enum<?> counterName) {
-    return reporter.getCounter(counterName);
-  }
-
-  public Counter getCounter(String groupName, String counterName) {
-    return reporter.getCounter(groupName, counterName);
-  }
-
-  @Override
-  public void progress() {
-    reporter.progress();
-  }
-
-  @Override
-  public void setStatus(String status) {
-    reporter.setStatus(status);
-  }
-  
-  public OutputCommitter getOutputCommitter() {
-    return committer;
-  }
+  public void write(KEYOUT key, VALUEOUT value) 
+      throws IOException, InterruptedException;
+
+  /**
+   * Get the {@link Counter} for the given <code>counterName</code>.
+   * @param counterName counter name
+   * @return the <code>Counter</code> for the given <code>counterName</code>
+   */
+  public Counter getCounter(Enum<?> counterName);
+
+  /**
+   * Get the {@link Counter} for the given <code>groupName</code> and 
+   * <code>counterName</code>.
+   * @param counterName counter name
+   * @return the <code>Counter</code> for the given <code>groupName</code> and 
+   *         <code>counterName</code>
+   */
+  public Counter getCounter(String groupName, String counterName);
+
+  /**
+   * Get the {@link OutputCommitter} for the task-attempt.
+   * @return the <code>OutputCommitter</code> for the task-attempt
+   */
+  public OutputCommitter getOutputCommitter();
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Sat Nov 28 20:26:01 2009
@@ -24,6 +24,8 @@
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 
 import java.net.URI;
 
@@ -125,6 +127,7 @@
  * @see org.apache.hadoop.mapred.JobConf
  * @see org.apache.hadoop.mapred.JobClient
  */
+@Deprecated
 public class DistributedCache {
   /**
    * Get the locally cached file or archive; it could either be 
@@ -148,9 +151,10 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
+   * @deprecated Internal to MapReduce framework. 
+   * Use TrackerDistributedCacheManager instead.
    */
+  @Deprecated
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, FileStatus fileStatus,
                                    boolean isArchive, long confFileStamp,
@@ -185,16 +189,17 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
+   * @deprecated Internal to MapReduce framework. 
+   * Use TrackerDistributedCacheManager instead.
    */
+  @Deprecated
   public static Path getLocalCache(URI cache, Configuration conf, 
       Path baseDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
       Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
 
     return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
-        baseDir, fileStatus, isArchive, confFileStamp, currentWorkDir,
+        baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir,
         honorSymLinkConf);
   }
 
@@ -219,9 +224,10 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
+   * @deprecated Internal to MapReduce framework.  
+   * Use TrackerDistributedCacheManager instead.
    */
+  @Deprecated
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, boolean isArchive,
                                    long confFileStamp, Path currentWorkDir) 
@@ -238,15 +244,44 @@
    * @param conf configuration which contains the filesystem the cache 
    * is contained in.
    * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
+   * @deprecated Internal to MapReduce framework. 
+   * Use TrackerDistributedCacheManager instead.
    */
+  @Deprecated
   public static void releaseCache(URI cache, Configuration conf)
       throws IOException {
-    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
+	// find the timestamp of the uri
+    URI[] archives = DistributedCache.getCacheArchives(conf);
+    URI[] files = DistributedCache.getCacheFiles(conf);
+    String[] archivesTimestamps =
+          DistributedCache.getArchiveTimestamps(conf);
+    String[] filesTimestamps =
+          DistributedCache.getFileTimestamps(conf);
+    String timestamp = null;
+    if (archives != null) {
+      for (int i = 0; i < archives.length; i++) {
+        if (archives[i].equals(cache)) {
+          timestamp = archivesTimestamps[i];
+          break;
+        }
+      }
+    }
+    if (timestamp == null && files != null) {
+      for (int i = 0; i < files.length; i++) {
+        if (files[i].equals(cache)) {
+          timestamp = filesTimestamps[i];
+          break;
+        }
+      }
+    }
+    if (timestamp == null) {
+      throw new IOException("TimeStamp of the uri couldnot be found");
+    }
+    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf,
+          Long.parseLong(timestamp));
   }
   
-  /*
+  /**
    * Returns the relative path of the dir this cache will be localized in
    * relative path that this cache will be localized in. For
    * hdfs://hostname:port/absolute_path -- the relative path is
@@ -256,6 +291,7 @@
    * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
    * instead.
    */
+  @Deprecated
   public static String makeRelative(URI cache, Configuration conf)
       throws IOException {
     return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
@@ -268,13 +304,13 @@
    * @param cache cache file 
    * @return mtime of a given cache file on hdfs
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  
+   * Use {@link TrackerDistributedCacheManager} instead.
    */
+  @Deprecated
   public static long getTimestamp(Configuration conf, URI cache)
     throws IOException {
-    FileSystem fileSystem = FileSystem.get(cache, conf);
-    Path filePath = new Path(cache.getPath());
-
-    return fileSystem.getFileStatus(filePath).getModificationTime();
+    return TrackerDistributedCacheManager.getTimestamp(conf, cache);
   }
 
   /**
@@ -286,6 +322,7 @@
    * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
    * instead.
    */
+  @Deprecated
   public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
       throws IOException{
     TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir, workDir);
@@ -296,10 +333,12 @@
    * to be used by user code.
    * @param archives The list of archives that need to be localized
    * @param conf Configuration which will be changed
+   * @deprecated Use {@link Job#setCacheArchives(URI[])} instead
    */
+  @Deprecated
   public static void setCacheArchives(URI[] archives, Configuration conf) {
     String sarchives = StringUtils.uriToString(archives);
-    conf.set("mapred.cache.archives", sarchives);
+    conf.set(JobContext.CACHE_ARCHIVES, sarchives);
   }
 
   /**
@@ -307,10 +346,12 @@
    * used by user code.
    * @param files The list of files that need to be localized
    * @param conf Configuration which will be changed
+   * @deprecated Use {@link Job#setCacheFiles(URI[])} instead
    */
+  @Deprecated
   public static void setCacheFiles(URI[] files, Configuration conf) {
     String sfiles = StringUtils.uriToString(files);
-    conf.set("mapred.cache.files", sfiles);
+    conf.set(JobContext.CACHE_FILES, sfiles);
   }
 
   /**
@@ -319,9 +360,11 @@
    * @param conf The configuration which contains the archives
    * @return A URI array of the caches set in the Configuration
    * @throws IOException
+   * @deprecated Use {@link JobContext#getCacheArchives()} instead
    */
+  @Deprecated
   public static URI[] getCacheArchives(Configuration conf) throws IOException {
-    return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives"));
+    return StringUtils.stringToURI(conf.getStrings(JobContext.CACHE_ARCHIVES));
   }
 
   /**
@@ -330,9 +373,11 @@
    * @param conf The configuration which contains the files
    * @return A URI array of the files set in the Configuration
    * @throws IOException
+   * @deprecated Use {@link JobContext#getCacheFiles()} instead
    */
+  @Deprecated
   public static URI[] getCacheFiles(Configuration conf) throws IOException {
-    return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
+    return StringUtils.stringToURI(conf.getStrings(JobContext.CACHE_FILES));
   }
 
   /**
@@ -341,11 +386,13 @@
    * @param conf Configuration that contains the localized archives
    * @return A path array of localized caches
    * @throws IOException
+   * @deprecated Use {@link JobContext#getLocalCacheArchives()} instead
    */
+  @Deprecated
   public static Path[] getLocalCacheArchives(Configuration conf)
     throws IOException {
     return StringUtils.stringToPath(conf
-                                    .getStrings("mapred.cache.localArchives"));
+                                    .getStrings(JobContext.CACHE_LOCALARCHIVES));
   }
 
   /**
@@ -354,10 +401,12 @@
    * @param conf Configuration that contains the localized files
    * @return A path array of localized files
    * @throws IOException
+   * @deprecated Use {@link JobContext#getLocalCacheFiles()} instead
    */
+  @Deprecated
   public static Path[] getLocalCacheFiles(Configuration conf)
     throws IOException {
-    return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles"));
+    return StringUtils.stringToPath(conf.getStrings(JobContext.CACHE_LOCALFILES));
   }
 
   /**
@@ -366,9 +415,11 @@
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
+   * @deprecated Use {@link JobContext#getArchiveTimestamps()} instead
    */
+  @Deprecated
   public static String[] getArchiveTimestamps(Configuration conf) {
-    return conf.getStrings("mapred.cache.archives.timestamps");
+    return conf.getStrings(JobContext.CACHE_ARCHIVES_TIMESTAMPS);
   }
 
 
@@ -378,9 +429,11 @@
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
+   * @deprecated Use {@link JobContext#getFileTimestamps()} instead
    */
+  @Deprecated
   public static String[] getFileTimestamps(Configuration conf) {
-    return conf.getStrings("mapred.cache.files.timestamps");
+    return conf.getStrings(JobContext.CACHE_FILE_TIMESTAMPS);
   }
 
   /**
@@ -389,9 +442,13 @@
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of archives.
    * The order should be the same as the order in which the archives are added.
+   * @deprecated Use 
+   * {@link TrackerDistributedCacheManager#setArchiveTimestamps(Configuration, String)}
+   * instead
    */
+  @Deprecated
   public static void setArchiveTimestamps(Configuration conf, String timestamps) {
-    conf.set("mapred.cache.archives.timestamps", timestamps);
+    TrackerDistributedCacheManager.setArchiveTimestamps(conf, timestamps);
   }
 
   /**
@@ -400,9 +457,13 @@
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of files.
    * The order should be the same as the order in which the files are added.
+   * @deprecated Use 
+   * {@link TrackerDistributedCacheManager#setFileTimestamps(Configuration, String)}
+   * instead
    */
+  @Deprecated
   public static void setFileTimestamps(Configuration conf, String timestamps) {
-    conf.set("mapred.cache.files.timestamps", timestamps);
+    TrackerDistributedCacheManager.setFileTimestamps(conf, timestamps);
   }
   
   /**
@@ -410,9 +471,13 @@
    * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local archives
+   * @deprecated Use 
+   * {@link TrackerDistributedCacheManager#setLocalArchives(Configuration, String)}
+   * instead
    */
+  @Deprecated
   public static void setLocalArchives(Configuration conf, String str) {
-    conf.set("mapred.cache.localArchives", str);
+    TrackerDistributedCacheManager.setLocalArchives(conf, str);
   }
 
   /**
@@ -420,9 +485,13 @@
    * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local files
+   * @deprecated Use 
+   * {@link TrackerDistributedCacheManager#setLocalFiles(Configuration, String)}
+   * instead
    */
+  @Deprecated
   public static void setLocalFiles(Configuration conf, String str) {
-    conf.set("mapred.cache.localFiles", str);
+    TrackerDistributedCacheManager.setLocalFiles(conf, str);
   }
 
   /**
@@ -430,10 +499,12 @@
    * be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
+   * @deprecated Use {@link Job#addCacheArchive(URI)} instead
    */
+  @Deprecated
   public static void addCacheArchive(URI uri, Configuration conf) {
-    String archives = conf.get("mapred.cache.archives");
-    conf.set("mapred.cache.archives", archives == null ? uri.toString()
+    String archives = conf.get(JobContext.CACHE_ARCHIVES);
+    conf.set(JobContext.CACHE_ARCHIVES, archives == null ? uri.toString()
              : archives + "," + uri.toString());
   }
   
@@ -442,10 +513,12 @@
    * to be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
+   * @deprecated Use {@link Job#addCacheFile(URI)} instead
    */
+  @Deprecated
   public static void addCacheFile(URI uri, Configuration conf) {
-    String files = conf.get("mapred.cache.files");
-    conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
+    String files = conf.get(JobContext.CACHE_FILES);
+    conf.set(JobContext.CACHE_FILES, files == null ? uri.toString() : files + ","
              + uri.toString());
   }
 
@@ -455,11 +528,13 @@
    * 
    * @param file Path of the file to be added
    * @param conf Configuration that contains the classpath setting
+   * @deprecated Use {@link Job#addFileToClassPath(Path)} instead
    */
+  @Deprecated
   public static void addFileToClassPath(Path file, Configuration conf)
     throws IOException {
-    String classpath = conf.get("mapred.job.classpath.files");
-    conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
+    String classpath = conf.get(JobContext.CLASSPATH_FILES);
+    conf.set(JobContext.CLASSPATH_FILES, classpath == null ? file.toString()
              : classpath + "," + file.toString());
     FileSystem fs = FileSystem.get(conf);
     URI uri = fs.makeQualified(file).toUri();
@@ -472,10 +547,12 @@
    * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
+   * @deprecated Use {@link JobContext#getFileClassPaths()} instead 
    */
+  @Deprecated
   public static Path[] getFileClassPaths(Configuration conf) {
     ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
-                                "mapred.job.classpath.files");
+                                JobContext.CLASSPATH_FILES);
     if (list.size() == 0) { 
       return null; 
     }
@@ -492,11 +569,13 @@
    * 
    * @param archive Path of the archive to be added
    * @param conf Configuration that contains the classpath setting
+   * @deprecated Use {@link Job#addArchiveToClassPath(Path)} instead
    */
+  @Deprecated
   public static void addArchiveToClassPath(Path archive, Configuration conf)
     throws IOException {
-    String classpath = conf.get("mapred.job.classpath.archives");
-    conf.set("mapred.job.classpath.archives", classpath == null ? archive
+    String classpath = conf.get(JobContext.CLASSPATH_ARCHIVES);
+    conf.set(JobContext.CLASSPATH_ARCHIVES, classpath == null ? archive
              .toString() : classpath + "," + archive.toString());
     FileSystem fs = FileSystem.get(conf);
     URI uri = fs.makeQualified(archive).toUri();
@@ -509,10 +588,12 @@
    * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
+   * @deprecated Use {@link JobContext#getArchiveClassPaths()} instead 
    */
+  @Deprecated
   public static Path[] getArchiveClassPaths(Configuration conf) {
     ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
-                                "mapred.job.classpath.archives");
+                                JobContext.CLASSPATH_ARCHIVES);
     if (list.size() == 0) { 
       return null; 
     }
@@ -527,10 +608,12 @@
    * This method allows you to create symlinks in the current working directory
    * of the task to all the cache files/archives.
    * Intended to be used by user code.
-   * @param conf the jobconf 
+   * @param conf the jobconf
+   * @deprecated Use {@link Job#createSymlink()} instead  
    */
+  @Deprecated
   public static void createSymlink(Configuration conf){
-    conf.set("mapred.create.symlink", "yes");
+    conf.set(JobContext.CACHE_SYMLINK, "yes");
   }
   
   /**
@@ -539,9 +622,11 @@
    * Used by internal DistributedCache code.
    * @param conf the jobconf
    * @return true if symlinks are to be created- else return false
+   * @deprecated Use {@link JobContext#getSymlink()} instead
    */
+  @Deprecated
   public static boolean getSymlink(Configuration conf){
-    String result = conf.get("mapred.create.symlink");
+    String result = conf.get(JobContext.CACHE_SYMLINK);
     if ("yes".equals(result)){
       return true;
     }
@@ -555,52 +640,22 @@
    * the various archives and files.  May be used by user code.
    * @param uriFiles The uri array of urifiles
    * @param uriArchives the uri array of uri archives
+   * @deprecated Use 
+   * {@link TrackerDistributedCacheManager#checkURIs(URI[], URI[])} instead
    */
+  @Deprecated
   public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
-    if ((uriFiles == null) && (uriArchives == null)){
-      return true;
-    }
-    if (uriFiles != null){
-      for (int i = 0; i < uriFiles.length; i++){
-        String frag1 = uriFiles[i].getFragment();
-        if (frag1 == null)
-          return false;
-        for (int j=i+1; j < uriFiles.length; j++){
-          String frag2 = uriFiles[j].getFragment();
-          if (frag2 == null)
-            return false;
-          if (frag1.equalsIgnoreCase(frag2))
-            return false;
-        }
-        if (uriArchives != null){
-          for (int j = 0; j < uriArchives.length; j++){
-            String frag2 = uriArchives[j].getFragment();
-            if (frag2 == null){
-              return false;
-            }
-            if (frag1.equalsIgnoreCase(frag2))
-              return false;
-            for (int k=j+1; k < uriArchives.length; k++){
-              String frag3 = uriArchives[k].getFragment();
-              if (frag3 == null)
-                return false;
-              if (frag2.equalsIgnoreCase(frag3))
-                return false;
-            }
-          }
-        }
-      }
-    }
-    return true;
+    return TrackerDistributedCacheManager.checkURIs(uriFiles, uriArchives);
   }
 
   /**
    * Clear the entire contents of the cache and delete the backing files. This
    * should only be used when the server is reinitializing, because the users
    * are going to lose their files.
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
+   * @deprecated Internal to MapReduce framework. 
+   * Use TrackerDistributedCacheManager instead.
    */
+  @Deprecated
   public static void purgeCache(Configuration conf) throws IOException {
     new TrackerDistributedCacheManager(conf).purgeCache();
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Sat Nov 28 20:26:01 2009
@@ -30,10 +30,9 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -97,7 +96,7 @@
         Map<String, Path> classPaths = new HashMap<String, Path>();
         if (paths != null) {
           for (Path p : paths) {
-            classPaths.put(p.toString(), p);
+            classPaths.put(p.toUri().getPath().toString(), p);
           }
         }
         for (int i = 0; i < uris.length; ++i) {
@@ -152,13 +151,9 @@
       URI uri = cacheFile.uri;
       FileSystem fileSystem = FileSystem.get(uri, taskConf);
       FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
-      String cacheId = this.distributedCacheManager.makeRelative(uri, taskConf);
-      String cachePath = cacheSubdir + Path.SEPARATOR + cacheId;
-      Path localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                fileStatus.getLen(), taskConf);
-      String baseDir = localPath.toString().replace(cacheId, "");
+
       Path p = distributedCacheManager.getLocalCache(uri, taskConf,
-          new Path(baseDir), fileStatus, 
+          cacheSubdir, fileStatus, 
           cacheFile.type == CacheFile.FileType.ARCHIVE,
           cacheFile.timestamp, workdirPath, false);
 
@@ -174,11 +169,12 @@
 
     // Update the configuration object with localized data.
     if (!localArchives.isEmpty()) {
-      DistributedCache.setLocalArchives(taskConf, 
+      TrackerDistributedCacheManager.setLocalArchives(taskConf, 
         stringifyPathList(localArchives));
     }
     if (!localFiles.isEmpty()) {
-      DistributedCache.setLocalFiles(taskConf, stringifyPathList(localFiles));
+      TrackerDistributedCacheManager.setLocalFiles(taskConf,
+        stringifyPathList(localFiles));
     }
 
   }
@@ -214,7 +210,7 @@
    */
   public void release() throws IOException {
     for (CacheFile c : cacheFiles) {
-      distributedCacheManager.releaseCache(c.uri, taskConf);
+      distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
     }
   }
 
@@ -222,7 +218,7 @@
    * Creates a class loader that includes the designated
    * files and archives.
    */
-  public ClassLoader makeClassLoader(final ClassLoader parent) 
+  public ClassLoader makeClassLoader(final ClassLoader parent)
       throws MalformedURLException {
     final URL[] urls = new URL[classPaths.size()];
     for (int i = 0; i < classPaths.size(); ++i) {
@@ -232,7 +228,7 @@
       @Override
       public ClassLoader run() {
         return new URLClassLoader(urls, parent);
-      }     
+      }
     });
   }
 }