You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2010/11/11 05:07:12 UTC

svn commit: r1033815 - in /hadoop/mapreduce/branches/branch-0.21: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/lib/input/ src/java/org/apache/hadoop/mapreduce/lib/join/ src/java/org/ap...

Author: amareshwari
Date: Thu Nov 11 04:07:11 2010
New Revision: 1033815

URL: http://svn.apache.org/viewvc?rev=1033815&view=rev
Log:
Merge -r 1033813:1033814 from trunk

Added:
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java
      - copied unchanged from r1033814, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java
Modified:
    hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/TaskInputOutputContextImpl.java

Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Thu Nov 11 04:07:11 2010
@@ -28,6 +28,9 @@ Release 0.21.1 - Unreleased
     MAPREDUCE-2032. Fixes TestJobCleanup to cleanup test directory in
     tearDown. (Dick King via amareshwari)
 
+    MAPREDUCE-1905. Fixes Context.setStatus() and progress() apis.
+    (amareshwari)
+
 Release 0.21.0 - 2010-08-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/MapTask.java Thu Nov 11 04:07:11 2010
@@ -607,7 +607,8 @@ class MapTask extends Task {
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
       new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
-                                                                  getTaskID());
+                                                                  getTaskID(),
+                                                                  reporter);
     // make a mapper
     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Nov 11 04:07:11 2010
@@ -521,7 +521,8 @@ public class ReduceTask extends Task {
     };
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID());
+      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
+          getTaskID(), reporter);
     // make a reducer
     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
       (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Task.java?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Task.java Thu Nov 11 04:07:11 2010
@@ -1311,7 +1311,8 @@ abstract public class Task implements Wr
       }
       // make a task context so we can get the classes
       org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-        new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId);
+        new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
+            reporter);
       Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = 
         (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
            taskContext.getCombinerClass();

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java Thu Nov 11 04:07:11 2010
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -28,16 +29,16 @@ import org.apache.hadoop.util.Progressab
 public class TaskAttemptContextImpl
        extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
        implements TaskAttemptContext {
-  private Progressable progress;
+  private Reporter reporter;
 
   TaskAttemptContextImpl(JobConf conf, TaskAttemptID taskid) {
     this(conf, taskid, Reporter.NULL);
   }
   
   TaskAttemptContextImpl(JobConf conf, TaskAttemptID taskid,
-                         Progressable progress) {
+                         Reporter reporter) {
     super(conf, taskid);
-    this.progress = progress;
+    this.reporter = reporter;
   }
   
   /**
@@ -50,7 +51,7 @@ public class TaskAttemptContextImpl
   }
   
   public Progressable getProgressible() {
-    return progress;
+    return reporter;
   }
   
   public JobConf getJobConf() {
@@ -58,7 +59,31 @@ public class TaskAttemptContextImpl
   }
 
   @Override
+  public Counter getCounter(Enum<?> counterName) {
+    return reporter.getCounter(counterName);
+  }
+
+  @Override
+  public Counter getCounter(String groupName, String counterName) {
+    return reporter.getCounter(groupName, counterName);
+  }
+
+  /**
+   * Report progress.
+   */
+  @Override
   public void progress() {
-    progress.progress();
+    reporter.progress();
   }
+
+  /**
+   * Set the current status of the task to the given string.
+   */
+  @Override
+  public void setStatus(String status) {
+    setStatusString(status);
+    reporter.setStatus(status);
+  }
+
+
 }

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java Thu Nov 11 04:07:11 2010
@@ -45,4 +45,20 @@ public interface TaskAttemptContext exte
    */
   public String getStatus();
 
+  /**
+   * Get the {@link Counter} for the given <code>counterName</code>.
+   * @param counterName counter name
+   * @return the <code>Counter</code> for the given <code>counterName</code>
+   */
+  public Counter getCounter(Enum<?> counterName);
+
+  /**
+   * Get the {@link Counter} for the given <code>groupName</code> and 
+   * <code>counterName</code>.
+   * @param counterName counter name
+   * @return the <code>Counter</code> for the given <code>groupName</code> and 
+   *         <code>counterName</code>
+   */
+  public Counter getCounter(String groupName, String counterName);
+
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java Thu Nov 11 04:07:11 2010
@@ -65,22 +65,6 @@ public interface TaskInputOutputContext<
       throws IOException, InterruptedException;
 
   /**
-   * Get the {@link Counter} for the given <code>counterName</code>.
-   * @param counterName counter name
-   * @return the <code>Counter</code> for the given <code>counterName</code>
-   */
-  public Counter getCounter(Enum<?> counterName);
-
-  /**
-   * Get the {@link Counter} for the given <code>groupName</code> and 
-   * <code>counterName</code>.
-   * @param counterName counter name
-   * @return the <code>Counter</code> for the given <code>groupName</code> and 
-   *         <code>counterName</code>
-   */
-  public Counter getCounter(String groupName, String counterName);
-
-  /**
    * Get the {@link OutputCommitter} for the task-attempt.
    * @return the <code>OutputCommitter</code> for the task-attempt
    */

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Thu Nov 11 04:07:11 2010
@@ -39,11 +39,9 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.hadoop.util.LineReader;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.fs.Seekable;
 
 /**
  * Treats keys as offset in file and value as line. 
@@ -72,7 +70,7 @@ public class LineRecordReader extends Re
   public void initialize(InputSplit genericSplit,
                          TaskAttemptContext context) throws IOException {
     FileSplit split = (FileSplit) genericSplit;
-    inputByteCounter = ((MapContext)context).getCounter(
+    inputByteCounter = context.getCounter(
       FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ);
     Configuration job = context.getConfiguration();
     this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java Thu Nov 11 04:07:11 2010
@@ -36,12 +36,14 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -343,8 +345,8 @@ public abstract static class Node extend
         Configuration conf = getConf(taskContext.getConfiguration());
         TaskAttemptContext context = 
           new TaskAttemptContextImpl(conf, 
-                                     TaskAttemptID.forName(
-                                         conf.get(MRJobConfig.TASK_ATTEMPT_ID)));
+              TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID)), 
+              new WrappedStatusReporter(taskContext));
         return rrCstrMap.get(ident).newInstance(id,
             inf.createRecordReader(split, context), cmpcl);
       } catch (IllegalAccessException e) {
@@ -361,6 +363,34 @@ public abstract static class Node extend
     }
   }
 
+  private static class WrappedStatusReporter extends StatusReporter {
+
+    TaskAttemptContext context;
+    
+    public WrappedStatusReporter(TaskAttemptContext context) {
+      this.context = context; 
+    }
+    @Override
+    public Counter getCounter(Enum<?> name) {
+      return context.getCounter(name);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+      return context.getCounter(group, name);
+    }
+
+    @Override
+    public void progress() {
+      context.progress();
+    }
+
+    @Override
+    public void setStatus(String status) {
+      context.setStatus(status);
+    }
+  }
+
   /**
    * Internal nodetype for &quot;composite&quot; InputFormats.
    */

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java Thu Nov 11 04:07:11 2010
@@ -386,7 +386,8 @@ public class MultipleOutputs<KEYOUT, VAL
     checkBaseOutputPath(baseOutputPath);
     TaskAttemptContext taskContext = 
       new TaskAttemptContextImpl(context.getConfiguration(), 
-                                 context.getTaskAttemptID());
+                                 context.getTaskAttemptID(),
+                                 new WrappedStatusReporter(context));
     getRecordWriter(taskContext, baseOutputPath).write(key, value);
   }
 
@@ -440,15 +441,43 @@ public class MultipleOutputs<KEYOUT, VAL
     job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
     job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
     job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
-    taskContext = 
-      new TaskAttemptContextImpl(job.getConfiguration(), 
-                                 context.getTaskAttemptID());
+    taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context
+        .getTaskAttemptID(), new WrappedStatusReporter(context));
 
     taskContexts.put(nameOutput, taskContext);
 
     return taskContext;
   }
-  
+
+  private static class WrappedStatusReporter extends StatusReporter {
+
+    TaskAttemptContext context;
+
+    public WrappedStatusReporter(TaskAttemptContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+      return context.getCounter(name);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+      return context.getCounter(group, name);
+    }
+
+    @Override
+    public void progress() {
+      context.progress();
+    }
+
+    @Override
+    public void setStatus(String status) {
+      context.setStatus(status);
+    }
+  }
+
   /**
    * Closes all the opened outputs.
    * 

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java Thu Nov 11 04:07:11 2010
@@ -21,6 +21,9 @@ package org.apache.hadoop.mapreduce.task
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
@@ -33,11 +36,18 @@ public class TaskAttemptContextImpl exte
     implements TaskAttemptContext {
   private final TaskAttemptID taskId;
   private String status = "";
-  
+  private StatusReporter reporter;
+
   public TaskAttemptContextImpl(Configuration conf, 
                                 TaskAttemptID taskId) {
+    this(conf, taskId, new DummyReporter());
+  }
+
+  public TaskAttemptContextImpl(Configuration conf, 
+      TaskAttemptID taskId, StatusReporter reporter) {
     super(conf, taskId.getJobID());
     this.taskId = taskId;
+    this.reporter = reporter;
   }
 
   /**
@@ -48,13 +58,6 @@ public class TaskAttemptContextImpl exte
   }
 
   /**
-   * Set the current status of the task to the given string.
-   */
-  public void setStatus(String msg) {
-    status = msg;
-  }
-
-  /**
    * Get the last set status message.
    * @return the current status message
    */
@@ -62,11 +65,47 @@ public class TaskAttemptContextImpl exte
     return status;
   }
 
+  @Override
+  public Counter getCounter(Enum<?> counterName) {
+    return reporter.getCounter(counterName);
+  }
+
+  @Override
+  public Counter getCounter(String groupName, String counterName) {
+    return reporter.getCounter(groupName, counterName);
+  }
+
+  /**
+   * Report progress.
+   */
+  @Override
+  public void progress() {
+    reporter.progress();
+  }
+
+  protected void setStatusString(String status) {
+    this.status = status;
+  }
+
   /**
-   * Report progress. The subtypes actually do work in this method.
+   * Set the current status of the task to the given string.
    */
   @Override
-  public void progress() { 
+  public void setStatus(String status) {
+    setStatusString(status);
+    reporter.setStatus(status);
+  }
+
+  public static class DummyReporter extends StatusReporter {
+    public void setStatus(String s) {
+    }
+    public void progress() {
+    }
+    public Counter getCounter(Enum<?> name) {
+      return new Counters().findCounter(name);
+    }
+    public Counter getCounter(String group, String name) {
+      return new Counters().findCounter(group, name);
+    }
   }
-    
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/TaskInputOutputContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/TaskInputOutputContextImpl.java?rev=1033815&r1=1033814&r2=1033815&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/TaskInputOutputContextImpl.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/TaskInputOutputContextImpl.java Thu Nov 11 04:07:11 2010
@@ -23,7 +23,6 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.StatusReporter;
@@ -46,16 +45,14 @@ public abstract class TaskInputOutputCon
        extends TaskAttemptContextImpl 
        implements TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
   private RecordWriter<KEYOUT,VALUEOUT> output;
-  private StatusReporter reporter;
   private OutputCommitter committer;
 
   public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
                                     RecordWriter<KEYOUT,VALUEOUT> output,
                                     OutputCommitter committer,
                                     StatusReporter reporter) {
-    super(conf, taskid);
+    super(conf, taskid, reporter);
     this.output = output;
-    this.reporter = reporter;
     this.committer = committer;
   }
 
@@ -92,14 +89,6 @@ public abstract class TaskInputOutputCon
     output.write(key, value);
   }
 
-  public Counter getCounter(Enum<?> counterName) {
-    return reporter.getCounter(counterName);
-  }
-
-  public Counter getCounter(String groupName, String counterName) {
-    return reporter.getCounter(groupName, counterName);
-  }
-
   public OutputCommitter getOutputCommitter() {
     return committer;
   }