You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/12/20 23:01:14 UTC

svn commit: r1221501 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/...

Author: acmurthy
Date: Tue Dec 20 22:01:13 2011
New Revision: 1221501

URL: http://svn.apache.org/viewvc?rev=1221501&view=rev
Log:
MAPREDUCE-3376. Fixed Task to ensure it passes reporter to combiners using old MR api. Contributed by Subroto Sanyal.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1221501&r1=1221500&r2=1221501&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Dec 20 22:01:13 2011
@@ -337,6 +337,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3563. Fixed LocalJobRunner to work correctly with new mapreduce
     apis. (acmurthy)
 
+    MAPREDUCE-3376. Fixed Task to ensure it passes reporter to combiners using
+    old MR api. (Subroto Sanyal via acmurthy)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1221501&r1=1221500&r2=1221501&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Tue Dec 20 22:01:13 2011
@@ -1458,11 +1458,11 @@ abstract public class Task implements Wr
       try {
         CombineValuesIterator<K,V> values = 
           new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, 
-                                         valueClass, job, Reporter.NULL,
+                                         valueClass, job, reporter,
                                          inputCounter);
         while (values.more()) {
           combiner.reduce(values.getKey(), values, combineCollector,
-              Reporter.NULL);
+              reporter);
           values.nextKey();
         }
       } finally {

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java?rev=1221501&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java Tue Dec 20 22:01:13 2011
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.CustomOutputCommitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class TestMRAppWithCombiner {
+
+  protected static MiniMRYarnCluster mrCluster;
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  private static final Log LOG = LogFactory.getLog(TestMRAppWithCombiner.class);
+
+  static {
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  @BeforeClass
+  public static void setup() throws IOException {
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+          + " not found. Not running test.");
+      return;
+    }
+
+    if (mrCluster == null) {
+      mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
+      Configuration conf = new Configuration();
+      mrCluster.init(conf);
+      mrCluster.start();
+    }
+
+    // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
+    // workaround the absent public discache.
+    localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR),
+        TestMRJobs.APP_JAR);
+    localFs.setPermission(TestMRJobs.APP_JAR, new FsPermission("700"));
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (mrCluster != null) {
+      mrCluster.stop();
+      mrCluster = null;
+    }
+  }
+
+  @Test
+  public void testCombinerShouldUpdateTheReporter() throws Exception {
+    JobConf conf = new JobConf(mrCluster.getConfig());
+    int numMaps = 5;
+    int numReds = 2;
+    Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+        "testCombinerShouldUpdateTheReporter-in");
+    Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+        "testCombinerShouldUpdateTheReporter-out");
+    createInputOutPutFolder(in, out, numMaps);
+    conf.setJobName("test-job-with-combiner");
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setCombinerClass(MyCombinerToCheckReporter.class);
+    //conf.setJarByClass(MyCombinerToCheckReporter.class);
+    conf.setReducerClass(IdentityReducer.class);
+    DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
+    conf.setOutputCommitter(CustomOutputCommitter.class);
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    FileInputFormat.setInputPaths(conf, in);
+    FileOutputFormat.setOutputPath(conf, out);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReds);
+    
+    runJob(conf);
+  }
+
+  static void createInputOutPutFolder(Path inDir, Path outDir, int numMaps)
+      throws Exception {
+    FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
+    if (!fs.exists(inDir)) {
+      fs.mkdirs(inDir);
+    }
+    String input = "The quick brown fox\n" + "has many silly\n"
+        + "red fox sox\n";
+    for (int i = 0; i < numMaps; ++i) {
+      DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
+      file.writeBytes(input);
+      file.close();
+    }
+  }
+
+  static boolean runJob(JobConf conf) throws Exception {
+    JobClient jobClient = new JobClient(conf);
+    RunningJob job = jobClient.submitJob(conf);
+    return jobClient.monitorAndPrintJob(conf, job);
+  }
+
+  class MyCombinerToCheckReporter<K, V> extends IdentityReducer<K, V> {
+    public void reduce(K key, Iterator<V> values, OutputCollector<K, V> output,
+        Reporter reporter) throws IOException {
+      if (Reporter.NULL == reporter) {
+        Assert.fail("A valid Reporter should have been used but, Reporter.NULL is used");
+      }
+    }
+  }
+
+}