You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2014/01/06 19:35:27 UTC

svn commit: r1555968 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoo...

Author: tucu
Date: Mon Jan  6 18:35:26 2014
New Revision: 1555968

URL: http://svn.apache.org/r1555968
Log:
MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Jan  6 18:35:26 2014
@@ -196,6 +196,8 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI
     with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
 
+    MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Mon Jan  6 18:35:26 2014
@@ -949,6 +949,23 @@ public class JobConf extends Configurati
     return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
   }
 
+  /**
+   * Get the user defined {@link WritableComparable} comparator for
+   * grouping keys of inputs to the combiner.
+   *
+   * @return comparator set by the user for grouping values.
+   * @see #setCombinerKeyGroupingComparator(Class) for details.
+   */
+  public RawComparator getCombinerKeyGroupingComparator() {
+    Class<? extends RawComparator> theClass = getClass(
+        JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
+    if (theClass == null) {
+      return getOutputKeyComparator();
+    }
+
+    return ReflectionUtils.newInstance(theClass, this);
+  }
+
   /** 
    * Get the user defined {@link WritableComparable} comparator for 
    * grouping keys of inputs to the reduce.
@@ -966,6 +983,37 @@ public class JobConf extends Configurati
     return ReflectionUtils.newInstance(theClass, this);
   }
 
+  /**
+   * Set the user defined {@link RawComparator} comparator for
+   * grouping keys in the input to the combiner.
+   * <p/>
+   * <p>This comparator should be provided if the equivalence rules for keys
+   * for sorting the intermediates are different from those for grouping keys
+   * before each call to
+   * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
+   * <p/>
+   * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
+   * in a single call to the reduce function if K1 and K2 compare as equal.</p>
+   * <p/>
+   * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
+   * how keys are sorted, this can be used in conjunction to simulate
+   * <i>secondary sort on values</i>.</p>
+   * <p/>
+   * <p><i>Note</i>: This is not a guarantee of the combiner sort being
+   * <i>stable</i> in any sense. (In any case, with the order of available
+   * map-outputs to the combiner being non-deterministic, it wouldn't make
+   * that much sense.)</p>
+   *
+   * @param theClass the comparator class to be used for grouping keys for the
+   * combiner. It should implement <code>RawComparator</code>.
+   * @see #setOutputKeyComparatorClass(Class)
+   */
+  public void setCombinerKeyGroupingComparator(
+      Class<? extends RawComparator> theClass) {
+    setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
+        theClass, RawComparator.class);
+  }
+
   /** 
    * Set the user defined {@link RawComparator} comparator for 
    * grouping keys in the input to the reduce.
@@ -989,7 +1037,9 @@ public class JobConf extends Configurati
    * 
    * @param theClass the comparator class to be used for grouping keys. 
    *                 It should implement <code>RawComparator</code>.
-   * @see #setOutputKeyComparatorClass(Class)                 
+   * @see #setOutputKeyComparatorClass(Class)
+   * @see {@link #setCombinerKeyGroupingComparator(Class)} for setting a 
+   * comparator for the combiner.
    */
   public void setOutputValueGroupingComparator(
       Class<? extends RawComparator> theClass) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Mon Jan  6 18:35:26 2014
@@ -1575,7 +1575,8 @@ abstract public class Task implements Wr
       combinerClass = cls;
       keyClass = (Class<K>) job.getMapOutputKeyClass();
       valueClass = (Class<V>) job.getMapOutputValueClass();
-      comparator = (RawComparator<K>) job.getOutputKeyComparator();
+      comparator = (RawComparator<K>)
+          job.getCombinerKeyGroupingComparator();
     }
 
     @SuppressWarnings("unchecked")
@@ -1624,7 +1625,7 @@ abstract public class Task implements Wr
       this.taskId = taskId;
       keyClass = (Class<K>) context.getMapOutputKeyClass();
       valueClass = (Class<V>) context.getMapOutputValueClass();
-      comparator = (RawComparator<K>) context.getSortComparator();
+      comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
       this.committer = committer;
     }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Mon Jan  6 18:35:26 2014
@@ -949,10 +949,26 @@ public class Job extends JobContextImpl 
   }
 
   /**
+   * Define the comparator that controls which keys are grouped together
+   * for a single call to combiner,
+   * {@link Reducer#reduce(Object, Iterable,
+   * org.apache.hadoop.mapreduce.Reducer.Context)}
+   *
+   * @param cls the raw comparator to use
+   * @throws IllegalStateException if the job is submitted
+   */
+  public void setCombinerKeyGroupingComparatorClass(
+      Class<? extends RawComparator> cls) throws IllegalStateException {
+    ensureState(JobState.DEFINE);
+    conf.setCombinerKeyGroupingComparator(cls);
+  }
+
+  /**
    * Define the comparator that controls how the keys are sorted before they
    * are passed to the {@link Reducer}.
    * @param cls the raw comparator
    * @throws IllegalStateException if the job is submitted
+   * @see {@link #setCombinerKeyGroupingComparatorClass(Class)}
    */
   public void setSortComparatorClass(Class<? extends RawComparator> cls
                                      ) throws IllegalStateException {
@@ -967,6 +983,8 @@ public class Job extends JobContextImpl 
    *                       org.apache.hadoop.mapreduce.Reducer.Context)}
    * @param cls the raw comparator to use
    * @throws IllegalStateException if the job is submitted
+   * @see {@link #setCombinerKeyGroupingComparatorClass(Class)} for setting a 
+   * comparator for the combiner.
    */
   public void setGroupingComparatorClass(Class<? extends RawComparator> cls
                                          ) throws IllegalStateException {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java Mon Jan  6 18:35:26 2014
@@ -167,13 +167,24 @@ public interface JobContext extends MRJo
    */
   public String getJar();
 
-  /** 
-   * Get the user defined {@link RawComparator} comparator for 
-   * grouping keys of inputs to the reduce.
-   * 
+  /**
+   * Get the user defined {@link RawComparator} comparator for
+   * grouping keys of inputs to the combiner.
+   *
    * @return comparator set by the user for grouping values.
-   * @see Job#setGroupingComparatorClass(Class) for details.  
+   * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details.
    */
+  public RawComparator<?> getCombinerKeyGroupingComparator();
+
+    /**
+     * Get the user defined {@link RawComparator} comparator for
+     * grouping keys of inputs to the reduce.
+     *
+     * @return comparator set by the user for grouping values.
+     * @see Job#setGroupingComparatorClass(Class) for details.
+     * @see {@link #getCombinerKeyGroupingComparator()} for setting a 
+     * comparator for the combiner.
+     */
   public RawComparator<?> getGroupingComparator();
   
   /**

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Mon Jan  6 18:35:26 2014
@@ -93,6 +93,8 @@ public interface MRJobConfig {
 
   public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
 
+  public static final String COMBINER_GROUP_COMPARATOR_CLASS = "mapreduce.job.combiner.group.comparator.class";
+
   public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class";
 
   public static final String WORKING_DIR = "mapreduce.job.working.dir";

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Mon Jan  6 18:35:26 2014
@@ -167,6 +167,11 @@ class ChainMapContextImpl<KEYIN, VALUEIN
   }
 
   @Override
+  public RawComparator<?> getCombinerKeyGroupingComparator() {
+    return base.getCombinerKeyGroupingComparator();
+  }
+
+  @Override
   public RawComparator<?> getGroupingComparator() {
     return base.getGroupingComparator();
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Mon Jan  6 18:35:26 2014
@@ -160,6 +160,11 @@ class ChainReduceContextImpl<KEYIN, VALU
   }
 
   @Override
+  public RawComparator<?> getCombinerKeyGroupingComparator() {
+    return base.getCombinerKeyGroupingComparator();
+  }
+
+  @Override
   public RawComparator<?> getGroupingComparator() {
     return base.getGroupingComparator();
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Mon Jan  6 18:35:26 2014
@@ -169,6 +169,11 @@ public class WrappedMapper<KEYIN, VALUEI
     }
 
     @Override
+    public RawComparator<?> getCombinerKeyGroupingComparator() {
+      return mapContext.getCombinerKeyGroupingComparator();
+    }
+
+    @Override
     public RawComparator<?> getGroupingComparator() {
       return mapContext.getGroupingComparator();
     }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Mon Jan  6 18:35:26 2014
@@ -162,6 +162,11 @@ public class WrappedReducer<KEYIN, VALUE
     }
 
     @Override
+    public RawComparator<?> getCombinerKeyGroupingComparator() {
+      return reduceContext.getCombinerKeyGroupingComparator();
+    }
+
+    @Override
     public RawComparator<?> getGroupingComparator() {
       return reduceContext.getGroupingComparator();
     }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Mon Jan  6 18:35:26 2014
@@ -252,6 +252,17 @@ public class JobContextImpl implements J
     return conf.getJar();
   }
 
+  /**
+   * Get the user defined {@link RawComparator} comparator for
+   * grouping keys of inputs to the combiner.
+   *
+   * @return comparator set by the user for grouping values.
+   * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details.
+   */
+  public RawComparator<?> getCombinerKeyGroupingComparator() {
+    return conf.getCombinerKeyGroupingComparator();
+  }
+
   /** 
    * Get the user defined {@link RawComparator} comparator for 
    * grouping keys of inputs to the reduce.

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1555968&r1=1555967&r2=1555968&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Mon Jan  6 18:35:26 2014
@@ -582,7 +582,7 @@ public class MergeManagerImpl<K, V> impl
     Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
     Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
     RawComparator<K> comparator = 
-      (RawComparator<K>)job.getOutputKeyComparator();
+      (RawComparator<K>)job.getCombinerKeyGroupingComparator();
     try {
       CombineValuesIterator values = new CombineValuesIterator(
           kvIter, comparator, keyClass, valClass, job, Reporter.NULL,

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java?rev=1555968&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java Mon Jan  6 18:35:26 2014
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestOldCombinerGrouping {
+  private static String TEST_ROOT_DIR =
+      new File("build", UUID.randomUUID().toString()).getAbsolutePath();
+
+  public static class Map implements
+      Mapper<LongWritable, Text, Text, LongWritable> {
+    @Override
+    public void map(LongWritable key, Text value,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
+      String v = value.toString();
+      String k = v.substring(0, v.indexOf(","));
+      v = v.substring(v.indexOf(",") + 1);
+      output.collect(new Text(k), new LongWritable(Long.parseLong(v)));
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(JobConf job) {
+    }
+  }
+
+  public static class Reduce implements
+      Reducer<Text, LongWritable, Text, LongWritable> {
+
+    @Override
+    public void reduce(Text key, Iterator<LongWritable> values,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
+      LongWritable maxValue = null;
+      while (values.hasNext()) {
+        LongWritable value = values.next();
+        if (maxValue == null) {
+          maxValue = value;
+        } else if (value.compareTo(maxValue) > 0) {
+          maxValue = value;
+        }
+      }
+      output.collect(key, maxValue);
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(JobConf job) {
+    }
+  }
+
+  public static class Combiner extends Reduce {
+  }
+
+  public static class GroupComparator implements RawComparator<Text> {
+    @Override
+    public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3,
+        int i4) {
+      byte[] b1 = new byte[i2];
+      System.arraycopy(bytes, i, b1, 0, i2);
+
+      byte[] b2 = new byte[i4];
+      System.arraycopy(bytes2, i3, b2, 0, i4);
+
+      return compare(new Text(new String(b1)), new Text(new String(b2)));
+    }
+
+    @Override
+    public int compare(Text o1, Text o2) {
+      String s1 = o1.toString();
+      String s2 = o2.toString();
+      s1 = s1.substring(0, s1.indexOf("|"));
+      s2 = s2.substring(0, s2.indexOf("|"));
+      return s1.compareTo(s2);
+    }
+
+  }
+
+  @Test
+  public void testCombiner() throws Exception {
+    if (!new File(TEST_ROOT_DIR).mkdirs()) {
+      throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR);
+    }
+    File in = new File(TEST_ROOT_DIR, "input");
+    if (!in.mkdirs()) {
+      throw new RuntimeException("Could not create test dir: " + in);
+    }
+    File out = new File(TEST_ROOT_DIR, "output");
+    PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt")));
+    pw.println("A|a,1");
+    pw.println("A|b,2");
+    pw.println("B|a,3");
+    pw.println("B|b,4");
+    pw.println("B|c,5");
+    pw.close();
+    JobConf job = new JobConf();
+    job.set("mapreduce.framework.name", "local");
+    TextInputFormat.setInputPaths(job, new Path(in.getPath()));
+    TextOutputFormat.setOutputPath(job, new Path(out.getPath()));
+    job.setMapperClass(Map.class);
+    job.setReducerClass(Reduce.class);
+    job.setInputFormat(TextInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(LongWritable.class);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setOutputValueGroupingComparator(GroupComparator.class);
+
+    job.setCombinerClass(Combiner.class);
+    job.setCombinerKeyGroupingComparator(GroupComparator.class);
+    job.setInt("min.num.spills.for.combine", 0);
+
+    JobClient client = new JobClient(job);
+    RunningJob runningJob = client.submitJob(job);
+    runningJob.waitForCompletion();
+    if (runningJob.isSuccessful()) {
+      Counters counters = runningJob.getCounters();
+
+      long combinerInputRecords = counters.getGroup(
+          "org.apache.hadoop.mapreduce.TaskCounter").
+          getCounter("COMBINE_INPUT_RECORDS");
+      long combinerOutputRecords = counters.getGroup(
+          "org.apache.hadoop.mapreduce.TaskCounter").
+          getCounter("COMBINE_OUTPUT_RECORDS");
+      Assert.assertTrue(combinerInputRecords > 0);
+      Assert.assertTrue(combinerInputRecords > combinerOutputRecords);
+
+      BufferedReader br = new BufferedReader(new FileReader(
+          new File(out, "part-00000")));
+      Set<String> output = new HashSet<String>();
+      String line = br.readLine();
+      Assert.assertNotNull(line);
+      output.add(line.substring(0, 1) + line.substring(4, 5));
+      line = br.readLine();
+      Assert.assertNotNull(line);
+      output.add(line.substring(0, 1) + line.substring(4, 5));
+      line = br.readLine();
+      Assert.assertNull(line);
+      br.close();
+
+      Set<String> expected = new HashSet<String>();
+      expected.add("A2");
+      expected.add("B5");
+
+      Assert.assertEquals(expected, output);
+
+    } else {
+      Assert.fail("Job failed");
+    }
+  }
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java?rev=1555968&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java Mon Jan  6 18:35:26 2014
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestNewCombinerGrouping {
+  private static String TEST_ROOT_DIR =
+      new File("build", UUID.randomUUID().toString()).getAbsolutePath();
+
+  public static class Map extends
+      Mapper<LongWritable, Text, Text, LongWritable> {
+
+    @Override
+    protected void map(LongWritable key, Text value,
+        Context context)
+        throws IOException, InterruptedException {
+      String v = value.toString();
+      String k = v.substring(0, v.indexOf(","));
+      v = v.substring(v.indexOf(",") + 1);
+      context.write(new Text(k), new LongWritable(Long.parseLong(v)));
+    }
+  }
+
+  public static class Reduce extends
+      Reducer<Text, LongWritable, Text, LongWritable> {
+
+    @Override
+    protected void reduce(Text key, Iterable<LongWritable> values,
+        Context context)
+        throws IOException, InterruptedException {
+      LongWritable maxValue = null;
+      for (LongWritable value : values) {
+        if (maxValue == null) {
+          maxValue = value;
+        } else if (value.compareTo(maxValue) > 0) {
+          maxValue = value;
+        }
+      }
+      context.write(key, maxValue);
+    }
+  }
+
+  public static class Combiner extends Reduce {
+  }
+
+  public static class GroupComparator implements RawComparator<Text> {
+    @Override
+    public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3,
+        int i4) {
+      byte[] b1 = new byte[i2];
+      System.arraycopy(bytes, i, b1, 0, i2);
+
+      byte[] b2 = new byte[i4];
+      System.arraycopy(bytes2, i3, b2, 0, i4);
+
+      return compare(new Text(new String(b1)), new Text(new String(b2)));
+    }
+
+    @Override
+    public int compare(Text o1, Text o2) {
+      String s1 = o1.toString();
+      String s2 = o2.toString();
+      s1 = s1.substring(0, s1.indexOf("|"));
+      s2 = s2.substring(0, s2.indexOf("|"));
+      return s1.compareTo(s2);
+    }
+
+  }
+
+  @Test
+  public void testCombiner() throws Exception {
+    if (!new File(TEST_ROOT_DIR).mkdirs()) {
+      throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR);
+    }
+    File in = new File(TEST_ROOT_DIR, "input");
+    if (!in.mkdirs()) {
+      throw new RuntimeException("Could not create test dir: " + in);
+    }
+    File out = new File(TEST_ROOT_DIR, "output");
+    PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt")));
+    pw.println("A|a,1");
+    pw.println("A|b,2");
+    pw.println("B|a,3");
+    pw.println("B|b,4");
+    pw.println("B|c,5");
+    pw.close();
+    JobConf conf = new JobConf();
+    conf.set("mapreduce.framework.name", "local");
+    Job job = new Job(conf);
+    TextInputFormat.setInputPaths(job, new Path(in.getPath()));
+    TextOutputFormat.setOutputPath(job, new Path(out.getPath()));
+
+    job.setMapperClass(Map.class);
+    job.setReducerClass(Reduce.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(LongWritable.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setGroupingComparatorClass(GroupComparator.class);
+
+    job.setCombinerKeyGroupingComparatorClass(GroupComparator.class);
+    job.setCombinerClass(Combiner.class);
+    job.getConfiguration().setInt("min.num.spills.for.combine", 0);
+
+    job.submit();
+    job.waitForCompletion(false);
+    if (job.isSuccessful()) {
+      Counters counters = job.getCounters();
+
+      long combinerInputRecords = counters.findCounter(
+          "org.apache.hadoop.mapreduce.TaskCounter",
+          "COMBINE_INPUT_RECORDS").getValue();
+      long combinerOutputRecords = counters.findCounter(
+          "org.apache.hadoop.mapreduce.TaskCounter",
+          "COMBINE_OUTPUT_RECORDS").getValue();
+      Assert.assertTrue(combinerInputRecords > 0);
+      Assert.assertTrue(combinerInputRecords > combinerOutputRecords);
+
+      BufferedReader br = new BufferedReader(new FileReader(
+          new File(out, "part-r-00000")));
+      Set<String> output = new HashSet<String>();
+      String line = br.readLine();
+      Assert.assertNotNull(line);
+      output.add(line.substring(0, 1) + line.substring(4, 5));
+      line = br.readLine();
+      Assert.assertNotNull(line);
+      output.add(line.substring(0, 1) + line.substring(4, 5));
+      line = br.readLine();
+      Assert.assertNull(line);
+      br.close();
+
+      Set<String> expected = new HashSet<String>();
+      expected.add("A2");
+      expected.add("B5");
+
+      Assert.assertEquals(expected, output);
+
+    } else {
+      Assert.fail("Job failed");
+    }
+  }
+
+}