You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2009/03/19 01:00:58 UTC

svn commit: r755792 - in /hadoop/core/branches/branch-0.20: ./ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapreduce/

Author: omalley
Date: Thu Mar 19 00:00:58 2009
New Revision: 755792

URL: http://svn.apache.org/viewvc?rev=755792&view=rev
Log:
HADOOP-5382. Support combiners in the new context object API. (omalley)

Modified:
    hadoop/core/branches/branch-0.20/   (props changed)
    hadoop/core/branches/branch-0.20/CHANGES.txt   (contents, props changed)
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
    hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 19 00:00:58 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Thu Mar 19 00:00:58 2009
@@ -642,8 +642,9 @@
     HADOOP-5255. Fix use of Math.abs to avoid overflow. (Jonathan Ellis via
     cdouglas)
 
-    HADOOP-5269. Fixes a problem to do with tasktracker holding on to FAILED_UNCLEAN
-    or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu via ddas) 
+    HADOOP-5269. Fixes a problem to do with tasktracker holding on to 
+    FAILED_UNCLEAN or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu
+    via ddas) 
 
     HADOOP-5214. Fixes a ConcurrentModificationException while the Fairshare
     Scheduler accesses the tasktrackers stored by the JobTracker.
@@ -651,37 +652,41 @@
 
     HADOOP-5233. Addresses the three issues - Race condition in updating
     status, NPE in TaskTracker task localization when the conf file is missing
-    (HADOOP-5234) and NPE in handling KillTaskAction of a cleanup task (HADOOP-5235).
-    (Amareshwari Sriramadasu via ddas)
+    (HADOOP-5234) and NPE in handling KillTaskAction of a cleanup task 
+    (HADOOP-5235). (Amareshwari Sriramadasu via ddas)
 
     HADOOP-5247. Introduces a broadcast of KillJobAction to all trackers when
-    a job finishes. This fixes a bunch of problems to do with NPE when a completed
-    job is not in memory and a tasktracker comes to the jobtracker with a status
-    report of a task belonging to that job. (Amar Kamat via ddas)
-
-    HADOOP-5282. Fixed job history logs for task attempts that are failed by the
-    JobTracker, say due to lost task trackers. (Amar Kamat via yhemanth)
+    a job finishes. This fixes a bunch of problems to do with NPE when a 
+    completed job is not in memory and a tasktracker comes to the jobtracker 
+    with a status report of a task belonging to that job. (Amar Kamat via ddas)
+
+    HADOOP-5282. Fixed job history logs for task attempts that are
+    failed by the JobTracker, say due to lost task trackers. (Amar
+    Kamat via yhemanth)
 
     HADOOP-4963. Fixes a logging to do with getting the location of
     map output file. (Amareshwari Sriramadasu via ddas)
     
     HADOOP-5292. Fix NPE in KFS::getBlockLocations. (Sriram Rao via lohit)    
 
-    HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes the estimation
-    formula linear where blowUp = Total-Output/Total-Input. (Sharad Agarwal via ddas)
+    HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes
+    the estimation formula linear where blowUp =
+    Total-Output/Total-Input. (Sharad Agarwal via ddas)
 
     HADOOP-5142. Fix MapWritable#putAll to store key/value classes. 
     (Doğacan Güney via enis)
 
-    HADOOP-4744. Workaround for jetty6 returning -1 when getLocalPort is invoked on
-    the connector. The workaround patch retries a few times before failing.
-    (Jothi Padmanabhan via yhemanth)
-
-    HADOOP-5280. Adds a check to prevent a task state transition from FAILED to any of
-    UNASSIGNED, RUNNING, COMMIT_PENDING or SUCCEEDED. (ddas) 
-
-    HADOOP-5272. Fixes a problem to do with detecting whether an attempt is the first
-    attempt of a Task. This affects JobTracker restart. (Amar Kamat via ddas)
+    HADOOP-4744. Workaround for jetty6 returning -1 when getLocalPort
+    is invoked on the connector. The workaround patch retries a few
+    times before failing.  (Jothi Padmanabhan via yhemanth)
+
+    HADOOP-5280. Adds a check to prevent a task state transition from
+    FAILED to any of UNASSIGNED, RUNNING, COMMIT_PENDING or
+    SUCCEEDED. (ddas)
+
+    HADOOP-5272. Fixes a problem to do with detecting whether an
+    attempt is the first attempt of a Task. This affects JobTracker
+    restart. (Amar Kamat via ddas)
 
     HADOOP-5306. Fixes a problem to do with logging/parsing the http port of a 
     lost tracker. Affects JobTracker restart. (Amar Kamat via ddas)
@@ -690,12 +695,13 @@
 
     HADOOP-5274. Fix gridmix2 dependency on wordcount example. (cdouglas)
 
-    HADOOP-5145. Balancer sometimes runs out of memory after running days or weeks.
-    (hairong)
+    HADOOP-5145. Balancer sometimes runs out of memory after running
+    days or weeks.  (hairong)
 
-    HADOOP-5338. Fix jobtracker restart to clear task completion events cached by
-    tasktrackers forcing them to fetch all events afresh, thus avoiding missed
-    task completion events on the tasktrackers. (Amar Kamat via yhemanth)
+    HADOOP-5338. Fix jobtracker restart to clear task completion
+    events cached by tasktrackers forcing them to fetch all events
+    afresh, thus avoiding missed task completion events on the
+    tasktrackers. (Amar Kamat via yhemanth)
 
     HADOOP-4695. Change TestGlobalFilter so that it allows a web page to be
     filtered more than once for a single access.  (Kan Zhang via szetszwo) 
@@ -716,28 +722,32 @@
     HADOOP-5395. Change the exception message when a job is submitted to an
     invalid queue. (Rahul Kumar Singh via yhemanth)
 
-    HADOOP-5276. Fixes a problem to do with updating the start time of a task when
-    the tracker that ran the task is lost. (Amar Kamat via ddas)
+    HADOOP-5276. Fixes a problem to do with updating the start time of
+    a task when the tracker that ran the task is lost. (Amar Kamat via
+    ddas)
 
-    HADOOP-5278. Fixes a problem to do with logging the finish time of a task 
-    during recovery (after a JobTracker restart). (Amar Kamat via ddas)
+    HADOOP-5278. Fixes a problem to do with logging the finish time of
+    a task during recovery (after a JobTracker restart). (Amar Kamat
+    via ddas)
 
-    HADOOP-5490. Fixes a synchronization problem in the EagerTaskInitializationListener
-    class. (Jothi Padmanabhan via ddas)
+    HADOOP-5490. Fixes a synchronization problem in the
+    EagerTaskInitializationListener class. (Jothi Padmanabhan via
+    ddas)
 
-    HADOOP-5493. The shuffle copier threads return the codecs back to the pool when the
-    shuffle completes. (Jothi Padmanabhan via ddas)
+    HADOOP-5493. The shuffle copier threads return the codecs back to
+    the pool when the shuffle completes. (Jothi Padmanabhan via ddas)
 
     HADOOP-5505. Fix JspHelper initialization in the context of
     MiniDFSCluster. (Raghu Angadi)
 
-    HADOOP-5414. Fixes IO exception while executing hadoop fs -touchz fileName by
-    making sure that lease renewal thread exits before dfs client exits.
-    (hairong)
-
-    HADOOP-5103. FileInputFormat now reuses the clusterMap network topology object
-    and that brings down the log messages in the JobClient to do with
-    NetworkTopology.add significantly. (Jothi Padmanabhan via ddas)
+    HADOOP-5414. Fixes IO exception while executing hadoop fs -touchz
+    fileName by making sure that lease renewal thread exits before dfs
+    client exits.  (hairong)
+
+    HADOOP-5103. FileInputFormat now reuses the clusterMap network
+    topology object and that brings down the log messages in the
+    JobClient to do with NetworkTopology.add significantly. (Jothi
+    Padmanabhan via ddas)
 
     HADOOP-5483. Fixes a problem in the Directory Cleanup Thread due to which
     TestMiniMRWithDFS sometimes used to fail. (ddas) 
@@ -751,9 +761,11 @@
     HADOOP-5514. Fix JobTracker metrics and add metrics for wating, failed
     tasks. (cdouglas)
 
-    HADOOP-5516. Fix NullPointerException in TaskMemoryManagerThread that comes when
-    monitored processes disappear when the thread is running.
-    (Vinod Kumar Vavilapalli via yhemanth)
+    HADOOP-5516. Fix NullPointerException in TaskMemoryManagerThread
+    that comes when monitored processes disappear when the thread is
+    running.  (Vinod Kumar Vavilapalli via yhemanth)
+
+    HADOOP-5382. Support combiners in the new context object API. (omalley)
 
 Release 0.19.2 - Unreleased
 

Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 19 00:00:58 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java Thu Mar 19 00:00:58 2009
@@ -312,7 +312,9 @@
   void runOldMapper(final JobConf job,
                     final BytesWritable rawSplit,
                     final TaskUmbilicalProtocol umbilical,
-                    TaskReporter reporter) throws IOException {
+                    TaskReporter reporter
+                    ) throws IOException, InterruptedException,
+                             ClassNotFoundException {
     InputSplit inputSplit = null;
     // reinstantiate the split
     try {
@@ -429,7 +431,7 @@
     NewOutputCollector(JobConf job, 
                        TaskUmbilicalProtocol umbilical,
                        TaskReporter reporter
-                       ) throws IOException {
+                       ) throws IOException, ClassNotFoundException {
       collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
     }
 
@@ -439,8 +441,13 @@
     }
 
     @Override
-    public void close(TaskAttemptContext context) throws IOException {
-      collector.flush();
+    public void close(TaskAttemptContext context
+                      ) throws IOException,InterruptedException {
+      try {
+        collector.flush();
+      } catch (ClassNotFoundException cnf) {
+        throw new IOException("can't find class ", cnf);
+      }
       collector.close();
     }
   }
@@ -525,9 +532,10 @@
   interface MapOutputCollector<K, V>
     extends OutputCollector<K, V> {
 
-    public void close() throws IOException;
+    public void close() throws IOException, InterruptedException;
     
-    public void flush() throws IOException;
+    public void flush() throws IOException, InterruptedException, 
+                               ClassNotFoundException;
         
   }
 
@@ -559,7 +567,8 @@
 
     }
 
-    public void flush() throws IOException {
+    public void flush() throws IOException, InterruptedException, 
+                               ClassNotFoundException {
     }
 
     public void collect(K key, V value) throws IOException {
@@ -582,7 +591,7 @@
     private final SerializationFactory serializationFactory;
     private final Serializer<K> keySerializer;
     private final Serializer<V> valSerializer;
-    private final Class<? extends Reducer> combinerClass;
+    private final CombinerRunner<K,V> combinerRunner;
     private final CombineOutputCollector<K, V> combineCollector;
     
     // Compression for map-outputs
@@ -627,7 +636,6 @@
    
     private final Counters.Counter mapOutputByteCounter;
     private final Counters.Counter mapOutputRecordCounter;
-    private final Counters.Counter combineInputCounter;
     private final Counters.Counter combineOutputCounter;
     
     private ArrayList<SpillRecord> indexCacheList;
@@ -636,7 +644,8 @@
 
     @SuppressWarnings("unchecked")
     public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
-                           TaskReporter reporter) throws IOException {
+                           TaskReporter reporter
+                           ) throws IOException, ClassNotFoundException {
       this.job = job;
       this.reporter = reporter;
       localFs = FileSystem.getLocal(job);
@@ -688,7 +697,8 @@
       // counters
       mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
       mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
-      combineInputCounter = reporter.getCounter(COMBINE_INPUT_RECORDS);
+      Counters.Counter combineInputCounter = 
+        reporter.getCounter(COMBINE_INPUT_RECORDS);
       combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS);
       // compression
       if (job.getCompressMapOutput()) {
@@ -697,10 +707,14 @@
         codec = ReflectionUtils.newInstance(codecClass, job);
       }
       // combiner
-      combinerClass = job.getCombinerClass();
-      combineCollector = (null != combinerClass)
-        ? new CombineOutputCollector<K,V>(combineOutputCounter)
-        : null;
+      combinerRunner = CombinerRunner.create(job, getTaskID(), 
+                                             combineInputCounter,
+                                             reporter, null);
+      if (combinerRunner != null) {
+        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
+      } else {
+        combineCollector = null;
+      }
       minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
       spillThread.setDaemon(true);
       spillThread.setName("SpillThread");
@@ -995,7 +1009,8 @@
       }
     }
 
-    public synchronized void flush() throws IOException {
+    public synchronized void flush() throws IOException, ClassNotFoundException,
+                                            InterruptedException {
       LOG.info("Starting flush of map output");
       spillLock.lock();
       try {
@@ -1085,7 +1100,8 @@
       spillReady.signal();
     }
 
-    private void sortAndSpill() throws IOException {
+    private void sortAndSpill() throws IOException, ClassNotFoundException,
+                                       InterruptedException {
       //approximate the length of the output file to be the length of the
       //buffer + header lengths for the partitions
       long size = (bufend >= bufstart
@@ -1113,7 +1129,7 @@
             long segmentStart = out.getPos();
             writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
                                       spilledRecordsCounter);
-            if (null == combinerClass) {
+            if (combinerRunner == null) {
               // spill directly
               DataInputBuffer key = new DataInputBuffer();
               while (spindex < endPosition &&
@@ -1140,7 +1156,7 @@
                 combineCollector.setWriter(writer);
                 RawKeyValueIterator kvIter =
                   new MRResultIterator(spstart, spindex);
-                combineAndSpill(kvIter, combineInputCounter);
+                combinerRunner.combine(kvIter, combineCollector);
               }
             }
 
@@ -1257,25 +1273,6 @@
       vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen);
     }
 
-    @SuppressWarnings("unchecked")
-    private void combineAndSpill(RawKeyValueIterator kvIter,
-        Counters.Counter inCounter) throws IOException {
-      Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
-      try {
-        CombineValuesIterator<K, V> values = new CombineValuesIterator<K, V>(
-            kvIter, comparator, keyClass, valClass, job, reporter,
-            inCounter);
-        while (values.more()) {
-          combiner.reduce(values.getKey(), values, combineCollector, reporter);
-          values.nextKey();
-          // indicate we're making progress
-          reporter.progress();
-        }
-      } finally {
-        combiner.close();
-      }
-    }
-
     /**
      * Inner class wrapping valuebytes, used for appendRaw.
      */
@@ -1329,7 +1326,8 @@
       public void close() { }
     }
 
-    private void mergeParts() throws IOException {
+    private void mergeParts() throws IOException, InterruptedException, 
+                                     ClassNotFoundException {
       // get the approximate size of the final output/index files
       long finalOutFileSize = 0;
       long finalIndexFileSize = 0;
@@ -1428,11 +1426,11 @@
           Writer<K, V> writer =
               new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
                                spilledRecordsCounter);
-          if (null == combinerClass || numSpills < minSpillsForCombine) {
+          if (combinerRunner == null || numSpills < minSpillsForCombine) {
             Merger.writeFile(kvIter, writer, reporter, job);
           } else {
             combineCollector.setWriter(writer);
-            combineAndSpill(kvIter, combineInputCounter);
+            combinerRunner.combine(kvIter, combineCollector);
           }
 
           //close

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu Mar 19 00:00:58 2009
@@ -119,8 +119,6 @@
     getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS);
   private Counters.Counter reduceOutputCounter = 
     getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS);
-  private Counters.Counter reduceCombineInputCounter =
-    getCounters().findCounter(Counter.COMBINE_INPUT_RECORDS);
   private Counters.Counter reduceCombineOutputCounter =
     getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS);
 
@@ -518,7 +516,7 @@
   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
   void runNewReducer(JobConf job,
                      final TaskUmbilicalProtocol umbilical,
-                     final Reporter reporter,
+                     final TaskReporter reporter,
                      RawKeyValueIterator rIter,
                      RawComparator<INKEY> comparator,
                      Class<INKEY> keyClass,
@@ -536,39 +534,14 @@
       (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
         outputFormat.getRecordWriter(taskContext);
     job.setBoolean("mapred.skip.on", isSkipping());
-    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
-         reducerContext = null;
-    try {
-      Constructor<org.apache.hadoop.mapreduce.Reducer.Context> contextConstructor =
-        org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor
-        (new Class[]{org.apache.hadoop.mapreduce.Reducer.class,
-            Configuration.class,
-            org.apache.hadoop.mapreduce.TaskAttemptID.class,
-            RawKeyValueIterator.class,
-            org.apache.hadoop.mapreduce.RecordWriter.class,
-            org.apache.hadoop.mapreduce.OutputCommitter.class,
-            org.apache.hadoop.mapreduce.StatusReporter.class,
-            RawComparator.class,
-            Class.class,
-            Class.class});
-
-      reducerContext = contextConstructor.newInstance(reducer, job, 
-                                                      getTaskID(),
-                                                      rIter, output, committer,
-                                                      reporter, comparator, 
-                                                      keyClass, valueClass);
-
-      reducer.run(reducerContext);
-      output.close(reducerContext);
-    } catch (NoSuchMethodException e) {
-      throw new IOException("Can't find Context constructor", e);
-    } catch (InstantiationException e) {
-      throw new IOException("Can't create Context", e);
-    } catch (InvocationTargetException e) {
-      throw new IOException("Can't invoke Context constructor", e);
-    } catch (IllegalAccessException e) {
-      throw new IOException("Can't invoke Context constructor", e);
-    }
+    org.apache.hadoop.mapreduce.Reducer.Context 
+         reducerContext = createReduceContext(reducer, job, getTaskID(),
+                                               rIter, reduceInputValueCounter, 
+                                               output, committer,
+                                               reporter, comparator, keyClass,
+                                               valueClass);
+    reducer.run(reducerContext);
+    output.close(reducerContext);
   }
 
   class ReduceCopier<K, V> implements MRConstants {
@@ -722,14 +695,14 @@
     private volatile int maxFetchRetriesPerMap;
     
     /**
-     * Combiner class to run during in-memory merge, if defined.
+     * Combiner runner, if a combiner is needed
      */
-    private final Class<? extends Reducer> combinerClass;
+    private CombinerRunner combinerRunner;
 
     /**
      * Resettable collector used for combine.
      */
-    private final CombineOutputCollector combineCollector;
+    private CombineOutputCollector combineCollector = null;
 
     /**
      * Maximum percent of failed fetch attempt before killing the reduce task.
@@ -1680,7 +1653,8 @@
     }
     
     public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
-                        TaskReporter reporter)throws IOException {
+                        TaskReporter reporter
+                        )throws ClassNotFoundException, IOException {
       
       configureClasspath(conf);
       this.reporter = reporter;
@@ -1693,10 +1667,15 @@
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.maxInFlight = 4 * numCopiers;
       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
-      this.combinerClass = conf.getCombinerClass();
-      combineCollector = (null != combinerClass)
-        ? new CombineOutputCollector(reduceCombineOutputCounter)
-        : null;
+      Counters.Counter combineInputCounter = 
+        reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
+      this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
+                                                  combineInputCounter,
+                                                  reporter, null);
+      if (combinerRunner != null) {
+        combineCollector = 
+          new CombineOutputCollector(reduceCombineOutputCounter);
+      }
       
       this.ioSortFactor = conf.getInt("io.sort.factor", 10);
       // the exponential backoff formula
@@ -2507,11 +2486,11 @@
                                conf.getOutputKeyComparator(), reporter,
                                spilledRecordsCounter, null);
           
-          if (null == combinerClass) {
+          if (combinerRunner == null) {
             Merger.writeFile(rIter, writer, reporter, conf);
           } else {
             combineCollector.setWriter(writer);
-            combineAndSpill(rIter, reduceCombineInputCounter);
+            combinerRunner.combine(rIter, combineCollector);
           }
           writer.close();
 
@@ -2536,29 +2515,6 @@
       }
     }
 
-    @SuppressWarnings("unchecked")
-    private void combineAndSpill(
-        RawKeyValueIterator kvIter,
-        Counters.Counter inCounter) throws IOException {
-      JobConf job = (JobConf)getConf();
-      Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
-      Class keyClass = job.getMapOutputKeyClass();
-      Class valClass = job.getMapOutputValueClass();
-      RawComparator comparator = job.getOutputKeyComparator();
-      try {
-        CombineValuesIterator values = new CombineValuesIterator(
-            kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
-            inCounter);
-        while (values.more()) {
-          combiner.reduce(values.getKey(), values, combineCollector,
-              Reporter.NULL);
-          values.nextKey();
-        }
-      } finally {
-        combiner.close();
-      }
-    }
-
     private class GetMapEventsThread extends Thread {
       
       private IntWritable fromEventId = new IntWritable(0);

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java Thu Mar 19 00:00:58 2009
@@ -21,6 +21,8 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.text.NumberFormat;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -41,6 +43,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
@@ -986,4 +989,208 @@
     }
   }
 
+  private static final Constructor<org.apache.hadoop.mapreduce.Reducer.Context> 
+  contextConstructor;
+  static {
+    try {
+      contextConstructor = 
+        org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor
+        (new Class[]{org.apache.hadoop.mapreduce.Reducer.class,
+            Configuration.class,
+            org.apache.hadoop.mapreduce.TaskAttemptID.class,
+            RawKeyValueIterator.class,
+            org.apache.hadoop.mapreduce.Counter.class,
+            org.apache.hadoop.mapreduce.RecordWriter.class,
+            org.apache.hadoop.mapreduce.OutputCommitter.class,
+            org.apache.hadoop.mapreduce.StatusReporter.class,
+            RawComparator.class,
+            Class.class,
+            Class.class});
+    } catch (NoSuchMethodException nme) {
+      throw new IllegalArgumentException("Can't find constructor");
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> 
+  org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+  createReduceContext(org.apache.hadoop.mapreduce.Reducer
+                        <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
+                      Configuration job,
+                      org.apache.hadoop.mapreduce.TaskAttemptID taskId, 
+                      RawKeyValueIterator rIter,
+                      org.apache.hadoop.mapreduce.Counter inputCounter,
+                      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, 
+                      org.apache.hadoop.mapreduce.OutputCommitter committer,
+                      org.apache.hadoop.mapreduce.StatusReporter reporter,
+                      RawComparator<INKEY> comparator,
+                      Class<INKEY> keyClass, Class<INVALUE> valueClass
+  ) throws IOException, ClassNotFoundException {
+    try {
+
+      return contextConstructor.newInstance(reducer, job, taskId,
+                                            rIter, inputCounter, output, 
+                                            committer, reporter, comparator, 
+                                            keyClass, valueClass);
+    } catch (InstantiationException e) {
+      throw new IOException("Can't create Context", e);
+    } catch (InvocationTargetException e) {
+      throw new IOException("Can't invoke Context constructor", e);
+    } catch (IllegalAccessException e) {
+      throw new IOException("Can't invoke Context constructor", e);
+    }
+  }
+
+  protected static abstract class CombinerRunner<K,V> {
+    protected final Counters.Counter inputCounter;
+    protected final JobConf job;
+    protected final TaskReporter reporter;
+
+    CombinerRunner(Counters.Counter inputCounter,
+                   JobConf job,
+                   TaskReporter reporter) {
+      this.inputCounter = inputCounter;
+      this.job = job;
+      this.reporter = reporter;
+    }
+    
+    /**
+     * Run the combiner over a set of inputs.
+     * @param iterator the key/value pairs to use as input
+     * @param collector the output collector
+     */
+    abstract void combine(RawKeyValueIterator iterator, 
+                          OutputCollector<K,V> collector
+                         ) throws IOException, InterruptedException, 
+                                  ClassNotFoundException;
+
+    static <K,V> 
+    CombinerRunner<K,V> create(JobConf job,
+                               TaskAttemptID taskId,
+                               Counters.Counter inputCounter,
+                               TaskReporter reporter,
+                               org.apache.hadoop.mapreduce.OutputCommitter committer
+                              ) throws ClassNotFoundException {
+      Class<? extends Reducer<K,V,K,V>> cls = 
+        (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
+      if (cls != null) {
+        return new OldCombinerRunner(cls, job, inputCounter, reporter);
+      }
+      // make a task context so we can get the classes
+      org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+        new org.apache.hadoop.mapreduce.TaskAttemptContext(job, taskId);
+      Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = 
+        (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
+           taskContext.getCombinerClass();
+      if (newcls != null) {
+        return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext, 
+                                          inputCounter, reporter, committer);
+      }
+      
+      return null;
+    }
+  }
+  
+  protected static class OldCombinerRunner<K,V> extends CombinerRunner<K,V> {
+    private final Class<? extends Reducer<K,V,K,V>> combinerClass;
+    private final Class<K> keyClass;
+    private final Class<V> valueClass;
+    private final RawComparator<K> comparator;
+
+    protected OldCombinerRunner(Class<? extends Reducer<K,V,K,V>> cls,
+                                JobConf conf,
+                                Counters.Counter inputCounter,
+                                TaskReporter reporter) {
+      super(inputCounter, conf, reporter);
+      combinerClass = cls;
+      keyClass = (Class<K>) job.getMapOutputKeyClass();
+      valueClass = (Class<V>) job.getMapOutputValueClass();
+      comparator = (RawComparator<K>) job.getOutputKeyComparator();
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void combine(RawKeyValueIterator kvIter,
+                           OutputCollector<K,V> combineCollector
+                           ) throws IOException {
+      Reducer<K,V,K,V> combiner = 
+        ReflectionUtils.newInstance(combinerClass, job);
+      try {
+        CombineValuesIterator<K,V> values = 
+          new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, 
+                                         valueClass, job, Reporter.NULL,
+                                         inputCounter);
+        while (values.more()) {
+          combiner.reduce(values.getKey(), values, combineCollector,
+              Reporter.NULL);
+          values.nextKey();
+        }
+      } finally {
+        combiner.close();
+      }
+    }
+  }
+  
+  protected static class NewCombinerRunner<K, V> extends CombinerRunner<K,V> {
+    private final Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> 
+        reducerClass;
+    private final org.apache.hadoop.mapreduce.TaskAttemptID taskId;
+    private final RawComparator<K> comparator;
+    private final Class<K> keyClass;
+    private final Class<V> valueClass;
+    private final org.apache.hadoop.mapreduce.OutputCommitter committer;
+
+    NewCombinerRunner(Class reducerClass,
+                      JobConf job,
+                      org.apache.hadoop.mapreduce.TaskAttemptID taskId,
+                      org.apache.hadoop.mapreduce.TaskAttemptContext context,
+                      Counters.Counter inputCounter,
+                      TaskReporter reporter,
+                      org.apache.hadoop.mapreduce.OutputCommitter committer) {
+      super(inputCounter, job, reporter);
+      this.reducerClass = reducerClass;
+      this.taskId = taskId;
+      keyClass = (Class<K>) context.getMapOutputKeyClass();
+      valueClass = (Class<V>) context.getMapOutputValueClass();
+      comparator = (RawComparator<K>) context.getSortComparator();
+      this.committer = committer;
+    }
+
+    private static class OutputConverter<K,V>
+            extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
+      OutputCollector<K,V> output;
+      OutputConverter(OutputCollector<K,V> output) {
+        this.output = output;
+      }
+
+      @Override
+      public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context){
+      }
+
+      @Override
+      public void write(K key, V value
+                        ) throws IOException, InterruptedException {
+        output.collect(key,value);
+      }
+    }
+
+    @Override
+    void combine(RawKeyValueIterator iterator, 
+                 OutputCollector<K,V> collector
+                 ) throws IOException, InterruptedException,
+                          ClassNotFoundException {
+      // make a reducer
+      org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =
+        (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)
+          ReflectionUtils.newInstance(reducerClass, job);
+      org.apache.hadoop.mapreduce.Reducer.Context 
+           reducerContext = createReduceContext(reducer, job, taskId,
+                                                iterator, inputCounter, 
+                                                new OutputConverter(collector),
+                                                committer,
+                                                reporter, comparator, keyClass,
+                                                valueClass);
+      reducer.run(reducerContext);
+    }
+    
+  }
 }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Job.java Thu Mar 19 00:00:58 2009
@@ -365,7 +365,7 @@
    * @return the counters for this job.
    * @throws IOException
    */
-  public Iterable<CounterGroup> getCounters() throws IOException {
+  public Counters getCounters() throws IOException {
     ensureState(JobState.RUNNING);
     return new Counters(info.getCounters());
   }
@@ -385,7 +385,6 @@
     int numReduces = conf.getNumReduceTasks();
     String oldMapperClass = "mapred.mapper.class";
     String oldReduceClass = "mapred.reducer.class";
-    String oldCombineClass = "mapred.combiner.class";
     conf.setBooleanIfUnset("mapred.mapper.new-api",
                            conf.get(oldMapperClass) == null);
     if (conf.getUseNewMapper()) {
@@ -393,7 +392,6 @@
       ensureNotSet("mapred.input.format.class", mode);
       ensureNotSet(oldMapperClass, mode);
       if (numReduces != 0) {
-        ensureNotSet(oldCombineClass, mode);
         ensureNotSet("mapred.partitioner.class", mode);
        } else {
         ensureNotSet("mapred.output.format.class", mode);
@@ -403,7 +401,6 @@
       ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode);
       ensureNotSet(JobContext.MAP_CLASS_ATTR, mode);
       if (numReduces != 0) {
-        ensureNotSet(JobContext.COMBINE_CLASS_ATTR, mode);
         ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode);
        } else {
         ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
@@ -416,12 +413,10 @@
         String mode = "new reduce API";
         ensureNotSet("mapred.output.format.class", mode);
         ensureNotSet(oldReduceClass, mode);   
-        ensureNotSet(oldCombineClass, mode);
       } else {
         String mode = "reduce compatability";
         ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
         ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode);   
-        ensureNotSet(JobContext.COMBINE_CLASS_ATTR, mode);        
       }
     }   
   }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java Thu Mar 19 00:00:58 2009
@@ -41,6 +41,7 @@
 public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
   private RawKeyValueIterator input;
+  private Counter inputCounter;
   private RawComparator<KEYIN> comparator;
   private KEYIN key;                                  // current key
   private VALUEIN value;                              // current value
@@ -56,6 +57,7 @@
 
   public ReduceContext(Configuration conf, TaskAttemptID taskid,
                        RawKeyValueIterator input, 
+                       Counter inputCounter,
                        RecordWriter<KEYOUT,VALUEOUT> output,
                        OutputCommitter committer,
                        StatusReporter reporter,
@@ -65,6 +67,7 @@
                        ) throws InterruptedException, IOException{
     super(conf, taskid, output, committer, reporter);
     this.input = input;
+    this.inputCounter = inputCounter;
     this.comparator = comparator;
     SerializationFactory serializationFactory = new SerializationFactory(conf);
     this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
@@ -106,6 +109,7 @@
     buffer.reset(next.getData(), next.getPosition(), next.getLength());
     value = valueDeserializer.deserialize(value);
     hasMore = input.next();
+    inputCounter.increment(1);
     if (hasMore) {
       next = input.getKey();
       nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java Thu Mar 19 00:00:58 2009
@@ -121,6 +121,7 @@
     extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
     public Context(Configuration conf, TaskAttemptID taskid,
                    RawKeyValueIterator input, 
+                   Counter inputCounter,
                    RecordWriter<KEYOUT,VALUEOUT> output,
                    OutputCommitter committer,
                    StatusReporter reporter,
@@ -128,8 +129,8 @@
                    Class<KEYIN> keyClass,
                    Class<VALUEIN> valueClass
                    ) throws IOException, InterruptedException {
-      super(conf, taskid, input, output, committer, reporter, comparator, 
-            keyClass, valueClass);
+      super(conf, taskid, input, inputCounter, output, committer, reporter, 
+            comparator, keyClass, valueClass);
     }
   }
 

Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=755792&r1=755791&r2=755792&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Thu Mar 19 00:00:58 2009
@@ -96,9 +96,10 @@
                             ) throws IOException,
                                      InterruptedException,
                                      ClassNotFoundException {
+    final String COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter";
     localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
     localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);    
-    writeFile("in/part1", "this is a test\nof word count\n");
+    writeFile("in/part1", "this is a test\nof word count test\ntest\n");
     writeFile("in/part2", "more test");
     Job job = new Job(conf, "word count");     
     job.setJarByClass(WordCount.class);
@@ -112,8 +113,21 @@
     assertTrue(job.waitForCompletion());
     String out = readFile("out/part-r-00000");
     System.out.println(out);
-    assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t2\nthis\t1\nword\t1\n",
+    assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n",
                  out);
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+    long combineIn = ctrs.findCounter(COUNTER_GROUP,
+                                      "COMBINE_INPUT_RECORDS").getValue();
+    long combineOut = ctrs.findCounter(COUNTER_GROUP, 
+                                       "COMBINE_OUTPUT_RECORDS").getValue();
+    long reduceIn = ctrs.findCounter(COUNTER_GROUP,
+                                     "REDUCE_INPUT_RECORDS").getValue();
+    long mapOut = ctrs.findCounter(COUNTER_GROUP, 
+                                   "MAP_OUTPUT_RECORDS").getValue();
+    assertEquals("map out = combine in", mapOut, combineIn);
+    assertEquals("combine out = reduce in", combineOut, reduceIn);
+    assertTrue("combine in > combine out", combineIn > combineOut);
   }
 
   private void runSecondarySort(Configuration conf) throws IOException,