You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/12/20 23:01:14 UTC
svn commit: r1221501 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/...
Author: acmurthy
Date: Tue Dec 20 22:01:13 2011
New Revision: 1221501
URL: http://svn.apache.org/viewvc?rev=1221501&view=rev
Log:
MAPREDUCE-3376. Fixed Task to ensure it passes reporter to combiners using old MR api. Contributed by Subroto Sanyal.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1221501&r1=1221500&r2=1221501&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Dec 20 22:01:13 2011
@@ -337,6 +337,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3563. Fixed LocalJobRunner to work correctly with new mapreduce
apis. (acmurthy)
+ MAPREDUCE-3376. Fixed Task to ensure it passes reporter to combiners using
+ old MR api. (Subroto Sanyal via acmurthy)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1221501&r1=1221500&r2=1221501&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Tue Dec 20 22:01:13 2011
@@ -1458,11 +1458,11 @@ abstract public class Task implements Wr
try {
CombineValuesIterator<K,V> values =
new CombineValuesIterator<K,V>(kvIter, comparator, keyClass,
- valueClass, job, Reporter.NULL,
+ valueClass, job, reporter,
inputCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
- Reporter.NULL);
+ reporter);
values.nextKey();
}
} finally {
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java?rev=1221501&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java Tue Dec 20 22:01:13 2011
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.CustomOutputCommitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class TestMRAppWithCombiner {
+
+ protected static MiniMRYarnCluster mrCluster;
+ private static Configuration conf = new Configuration();
+ private static FileSystem localFs;
+ private static final Log LOG = LogFactory.getLog(TestMRAppWithCombiner.class);
+
+ static {
+ try {
+ localFs = FileSystem.getLocal(conf);
+ } catch (IOException io) {
+ throw new RuntimeException("problem getting local fs", io);
+ }
+ }
+
+ @BeforeClass
+ public static void setup() throws IOException {
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ if (mrCluster == null) {
+ mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
+ Configuration conf = new Configuration();
+ mrCluster.init(conf);
+ mrCluster.start();
+ }
+
+ // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
+ // workaround the absent public discache.
+ localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR),
+ TestMRJobs.APP_JAR);
+ localFs.setPermission(TestMRJobs.APP_JAR, new FsPermission("700"));
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (mrCluster != null) {
+ mrCluster.stop();
+ mrCluster = null;
+ }
+ }
+
+ @Test
+ public void testCombinerShouldUpdateTheReporter() throws Exception {
+ JobConf conf = new JobConf(mrCluster.getConfig());
+ int numMaps = 5;
+ int numReds = 2;
+ Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+ "testCombinerShouldUpdateTheReporter-in");
+ Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+ "testCombinerShouldUpdateTheReporter-out");
+ createInputOutPutFolder(in, out, numMaps);
+ conf.setJobName("test-job-with-combiner");
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setCombinerClass(MyCombinerToCheckReporter.class);
+ //conf.setJarByClass(MyCombinerToCheckReporter.class);
+ conf.setReducerClass(IdentityReducer.class);
+ DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
+ conf.setOutputCommitter(CustomOutputCommitter.class);
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(conf, in);
+ FileOutputFormat.setOutputPath(conf, out);
+ conf.setNumMapTasks(numMaps);
+ conf.setNumReduceTasks(numReds);
+
+ runJob(conf);
+ }
+
+ static void createInputOutPutFolder(Path inDir, Path outDir, int numMaps)
+ throws Exception {
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
+ if (!fs.exists(inDir)) {
+ fs.mkdirs(inDir);
+ }
+ String input = "The quick brown fox\n" + "has many silly\n"
+ + "red fox sox\n";
+ for (int i = 0; i < numMaps; ++i) {
+ DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
+ file.writeBytes(input);
+ file.close();
+ }
+ }
+
+ static boolean runJob(JobConf conf) throws Exception {
+ JobClient jobClient = new JobClient(conf);
+ RunningJob job = jobClient.submitJob(conf);
+ return jobClient.monitorAndPrintJob(conf, job);
+ }
+
+ class MyCombinerToCheckReporter<K, V> extends IdentityReducer<K, V> {
+ public void reduce(K key, Iterator<V> values, OutputCollector<K, V> output,
+ Reporter reporter) throws IOException {
+ if (Reporter.NULL == reporter) {
+ Assert.fail("A valid Reporter should have been used but, Reporter.NULL is used");
+ }
+ }
+ }
+
+}