You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2011/05/23 19:06:09 UTC
svn commit: r1126591 - in /hadoop/mapreduce/trunk: ./
src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/
src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduc...
Author: amarrk
Date: Mon May 23 17:06:08 2011
New Revision: 1126591
URL: http://svn.apache.org/viewvc?rev=1126591&view=rev
Log:
MAPREDUCE-2492. The new MapReduce API should make available task's progress to the task. (amarrk)
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReporter.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReporter.java
hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/StatusReporter.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon May 23 17:06:08 2011
@@ -14,6 +14,9 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ MAPREDUCE-2492. The new MapReduce API should make available task's
+ progress to the task. (amarrk)
+
MAPREDUCE-2153. Bring in more job configuration properties in to the trace
file. (Rajesh Balamohan via amarrk)
Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReporter.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReporter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReporter.java Mon May 23 17:06:08 2011
@@ -58,5 +58,10 @@ public class MockReporter extends Status
return counter;
}
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java Mon May 23 17:06:08 2011
@@ -92,5 +92,9 @@ public class MockReporter implements Rep
return counter;
}
+
+ public float getProgress() {
+ return 0;
+ };
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon May 23 17:06:08 2011
@@ -59,7 +59,6 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
@@ -294,8 +293,16 @@ class MapTask extends Task {
this.umbilical = umbilical;
if (isMapTask()) {
- mapPhase = getProgress().addPhase("map", 0.667f);
- sortPhase = getProgress().addPhase("sort", 0.333f);
+ // If there are no reducers then there won't be any sort. Hence the map
+ // phase will govern the entire attempt's progress.
+ if (conf.getNumReduceTasks() == 0) {
+ mapPhase = getProgress().addPhase("map", 1.0f);
+ } else {
+ // If there are reducers then the entire attempt's progress will be
+ // split between the map phase (67%) and the sort phase (33%).
+ mapPhase = getProgress().addPhase("map", 0.667f);
+ sortPhase = getProgress().addPhase("sort", 0.333f);
+ }
}
TaskReporter reporter = startReporter(umbilical);
@@ -388,7 +395,10 @@ class MapTask extends Task {
try {
runner.run(in, new OldOutputCollector(collector, conf), reporter);
mapPhase.complete();
- setPhase(TaskStatus.Phase.SORT);
+ // start the sort phase only if there are reducers
+ if (numReduceTasks > 0) {
+ setPhase(TaskStatus.Phase.SORT);
+ }
statusUpdate(umbilical);
collector.flush();
} finally {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon May 23 17:06:08 2011
@@ -361,6 +361,8 @@ public class ReduceTask extends Task {
taskStatus, copyPhase, sortPhase, this);
rIter = shuffle.run();
} else {
+ // local job runner doesn't have a copy phase
+ copyPhase.complete();
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(), codec,
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Mon May 23 17:06:08 2011
@@ -64,6 +64,10 @@ public interface Reporter extends Progre
public InputSplit getInputSplit() throws UnsupportedOperationException {
throw new UnsupportedOperationException("NULL reporter has no input");
}
+ @Override
+ public float getProgress() {
+ return 0;
+ }
};
/**
@@ -120,4 +124,10 @@ public interface Reporter extends Progre
*/
public abstract InputSplit getInputSplit()
throws UnsupportedOperationException;
+
+ /**
+ * Get the progress of the task. Progress is represented as a number between
+ * 0 and 1 (inclusive).
+ */
+ public float getProgress();
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon May 23 17:06:08 2011
@@ -569,6 +569,11 @@ abstract public class Task implements Wr
// indicate that progress update needs to be sent
setProgressFlag();
}
+
+ public float getProgress() {
+ return taskProgress.getProgress();
+ };
+
public void progress() {
// indicate that progress update needs to be sent
setProgressFlag();
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java Mon May 23 17:06:08 2011
@@ -60,6 +60,11 @@ public class TaskAttemptContextImpl
public JobConf getJobConf() {
return (JobConf) getConfiguration();
}
+
+ @Override
+ public float getProgress() {
+ return reporter.getProgress();
+ }
@Override
public Counter getCounter(Enum<?> counterName) {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/StatusReporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/StatusReporter.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/StatusReporter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/StatusReporter.java Mon May 23 17:06:08 2011
@@ -24,5 +24,11 @@ public abstract class StatusReporter {
public abstract Counter getCounter(Enum<?> name);
public abstract Counter getCounter(String group, String name);
public abstract void progress();
+ /**
+ * Get the current progress.
+ * @return a number between 0.0 and 1.0 (inclusive) indicating the attempt's
+ * progress.
+ */
+ public abstract float getProgress();
public abstract void setStatus(String status);
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java Mon May 23 17:06:08 2011
@@ -44,6 +44,13 @@ public interface TaskAttemptContext exte
* @return the current status message
*/
public String getStatus();
+
+ /**
+ * The current progress of the task attempt.
+ * @return a number between 0.0 and 1.0 (inclusive) indicating the attempt's
+ * progress.
+ */
+ public abstract float getProgress();
/**
* Get the {@link Counter} for the given <code>counterName</code>.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Mon May 23 17:06:08 2011
@@ -316,4 +316,8 @@ class ChainMapContextImpl<KEYIN, VALUEIN
return base.getCredentials();
}
+ @Override
+ public float getProgress() {
+ return base.getProgress();
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Mon May 23 17:06:08 2011
@@ -308,4 +308,9 @@ class ChainReduceContextImpl<KEYIN, VALU
public Credentials getCredentials() {
return base.getCredentials();
}
+
+ @Override
+ public float getProgress() {
+ return base.getProgress();
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java Mon May 23 17:06:08 2011
@@ -386,6 +386,11 @@ public abstract static class Node extend
}
@Override
+ public float getProgress() {
+ return context.getProgress();
+ }
+
+ @Override
public void setStatus(String status) {
context.setStatus(status);
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Mon May 23 17:06:08 2011
@@ -240,6 +240,10 @@ public class MultithreadedMapper<K1, V1,
outer.setStatus(status);
}
+ @Override
+ public float getProgress() {
+ return outer.getProgress();
+ }
}
private class MapRunner extends Thread {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Mon May 23 17:06:08 2011
@@ -317,5 +317,10 @@ public class WrappedMapper<KEYIN, VALUEI
public Credentials getCredentials() {
return mapContext.getCredentials();
}
+
+ @Override
+ public float getProgress() {
+ return mapContext.getProgress();
+ }
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java Mon May 23 17:06:08 2011
@@ -471,6 +471,11 @@ public class MultipleOutputs<KEYOUT, VAL
}
@Override
+ public float getProgress() {
+ return context.getProgress();
+ }
+
+ @Override
public void setStatus(String status) {
context.setStatus(status);
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Mon May 23 17:06:08 2011
@@ -321,5 +321,10 @@ public class WrappedReducer<KEYIN, VALUE
public Credentials getCredentials() {
return reduceContext.getCredentials();
}
+
+ @Override
+ public float getProgress() {
+ return reduceContext.getProgress();
+ }
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java Mon May 23 17:06:08 2011
@@ -107,5 +107,13 @@ public class TaskAttemptContextImpl exte
public Counter getCounter(String group, String name) {
return new Counters().findCounter(group, name);
}
+ public float getProgress() {
+ return 0f;
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ return reporter.getProgress();
}
}
\ No newline at end of file
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java Mon May 23 17:06:08 2011
@@ -94,7 +94,7 @@ public class TestMapProgress extends Tes
}
// validate map task progress when the map task is in map phase
assertTrue("Map progress is not the expected value.",
- Math.abs(mapTaskProgress - ((0.667/3)*recordNum)) < 0.001);
+ Math.abs(mapTaskProgress - ((float)recordNum/3)) < 0.001);
}
}
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReporter.java?rev=1126591&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReporter.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReporter.java Mon May 23 17:06:08 2011
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Tests the old mapred APIs with {@link Reporter#getProgress()}.
+ */
+public class TestReporter {
+ private static final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp"));
+ private static final Path testRootTempDir =
+ new Path(rootTempDir, "TestReporter");
+
+ private static FileSystem fs = null;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ fs = FileSystem.getLocal(new Configuration());
+ fs.delete(testRootTempDir, true);
+ fs.mkdirs(testRootTempDir);
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ fs.delete(testRootTempDir, true);
+ }
+
+ // an input with 4 lines
+ private static final String INPUT = "Hi\nHi\nHi\nHi\n";
+ private static final int INPUT_LINES = INPUT.split("\n").length;
+
+ @SuppressWarnings("deprecation")
+ static class ProgressTesterMapper extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, Text> {
+ private float progressRange = 0;
+ private int numRecords = 0;
+ private Reporter reporter = null;
+
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ // set the progress range accordingly
+ if (job.getNumReduceTasks() == 0) {
+ progressRange = 1f;
+ } else {
+ progressRange = 0.667f;
+ }
+ }
+
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<Text, Text> output, Reporter reporter)
+ throws IOException {
+ this.reporter = reporter;
+
+ // calculate the actual map progress
+ float mapProgress = ((float)++numRecords)/INPUT_LINES;
+ // calculate the attempt progress based on the progress range
+ float attemptProgress = progressRange * mapProgress;
+ assertEquals("Invalid progress in map",
+ attemptProgress, reporter.getProgress(), 0f);
+ output.collect(new Text(value.toString() + numRecords), value);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ assertEquals("Invalid progress in map cleanup",
+ progressRange, reporter.getProgress(), 0f);
+ }
+ }
+
+ /**
+ * Test {@link Reporter}'s progress for a map-only job.
+ * This will make sure that only the map phase decides the attempt's progress.
+ */
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testReporterProgressForMapOnlyJob() throws IOException {
+ Path test = new Path(testRootTempDir, "testReporterProgressForMapOnlyJob");
+
+ JobConf conf = new JobConf();
+ conf.setMapperClass(ProgressTesterMapper.class);
+ conf.setMapOutputKeyClass(Text.class);
+ // fail early
+ conf.setMaxMapAttempts(1);
+ conf.setMaxReduceAttempts(0);
+
+ RunningJob job =
+ UtilsForTests.runJob(conf, new Path(test, "in"), new Path(test, "out"),
+ 1, 0, INPUT);
+ job.waitForCompletion();
+
+ assertTrue("Job failed", job.isSuccessful());
+ }
+
+ /**
+ * A {@link Reducer} implementation that checks the progress on every call
+ * to {@link Reducer#reduce(Object, Iterator, OutputCollector, Reporter)}.
+ */
+ @SuppressWarnings("deprecation")
+ static class ProgressTestingReducer extends MapReduceBase
+ implements Reducer<Text, Text, Text, Text> {
+ private int recordCount = 0;
+ private Reporter reporter = null;
+ // reduce task has a fixed split of progress amongst copy, shuffle and
+ // reduce phases.
+ private final float REDUCE_PROGRESS_RANGE = 1.0f/3;
+ private final float SHUFFLE_PROGRESS_RANGE = 1 - REDUCE_PROGRESS_RANGE;
+
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ }
+
+ @Override
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output, Reporter reporter)
+ throws IOException {
+ float reducePhaseProgress = ((float)++recordCount)/INPUT_LINES;
+ float weightedReducePhaseProgress =
+ reducePhaseProgress * REDUCE_PROGRESS_RANGE;
+ assertEquals("Invalid progress in reduce",
+ SHUFFLE_PROGRESS_RANGE + weightedReducePhaseProgress,
+ reporter.getProgress(), 0.02f);
+ this.reporter = reporter;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ assertEquals("Invalid progress in reduce cleanup",
+ 1.0f, reporter.getProgress(), 0f);
+ }
+ }
+
+ /**
+ * Test {@link Reporter}'s progress for map-reduce job.
+ */
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testReporterProgressForMRJob() throws IOException {
+ Path test = new Path(testRootTempDir, "testReporterProgressForMRJob");
+
+ JobConf conf = new JobConf();
+ conf.setMapperClass(ProgressTesterMapper.class);
+ conf.setReducerClass(ProgressTestingReducer.class);
+ conf.setMapOutputKeyClass(Text.class);
+ // fail early
+ conf.setMaxMapAttempts(1);
+ conf.setMaxReduceAttempts(1);
+
+ RunningJob job =
+ UtilsForTests.runJob(conf, new Path(test, "in"), new Path(test, "out"),
+ 1, 1, INPUT);
+ job.waitForCompletion();
+
+ assertTrue("Job failed", job.isSuccessful());
+ }
+}
\ No newline at end of file
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Mon May 23 17:06:08 2011
@@ -559,6 +559,16 @@ public class UtilsForTests {
static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException {
+ String input = "The quick brown fox\n" + "has many silly\n"
+ + "red fox sox\n";
+
+ // submit the job and wait for it to complete
+ return runJob(conf, inDir, outDir, numMaps, numReds, input);
+ }
+
+ // Start a job with the specified input and return its RunningJob object
+ static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
+ int numReds, String input) throws IOException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
@@ -566,8 +576,7 @@ public class UtilsForTests {
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
- String input = "The quick brown fox\n" + "has many silly\n"
- + "red fox sox\n";
+
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Mon May 23 17:06:08 2011
@@ -388,6 +388,10 @@ public class MapReduceTestUtil {
}
public void progress() {
}
+ @Override
+ public float getProgress() {
+ return 0;
+ }
public Counter getCounter(Enum<?> name) {
return new Counters().findCounter(name);
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java Mon May 23 17:06:08 2011
@@ -18,16 +18,45 @@
package org.apache.hadoop.mapreduce;
import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil.DataCopyMapper;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil.DataCopyReducer;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
/**
- * Tests context api.
+ * Tests context api and {@link StatusReporter#getProgress()} via
+ * {@link TaskAttemptContext#getProgress()} API .
*/
public class TestTaskContext extends HadoopTestCase {
+ private static final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp"));
+ private static final Path testRootTempDir =
+ new Path(rootTempDir, "TestTaskContext");
+
+ private static FileSystem fs = null;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ fs = FileSystem.getLocal(new Configuration());
+ fs.delete(testRootTempDir, true);
+ fs.mkdirs(testRootTempDir);
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ fs.delete(testRootTempDir, true);
+ }
+
public TestTaskContext() throws IOException {
super(HadoopTestCase.CLUSTER_MR , HadoopTestCase.LOCAL_FS, 1, 1);
}
@@ -48,16 +77,188 @@ public class TestTaskContext extends Had
* @throws InterruptedException
* @throws ClassNotFoundException
*/
+ @Test
public void testContextStatus()
throws IOException, InterruptedException, ClassNotFoundException {
+ Path test = new Path(testRootTempDir, "testContextStatus");
+
+ // test with 1 map and 0 reducers
+ // test with custom task status
int numMaps = 1;
- Job job = MapReduceTestUtil.createJob(createJobConf(), new Path("in"),
- new Path("out"), numMaps, 0);
+ Job job = MapReduceTestUtil.createJob(createJobConf(),
+ new Path(test, "in"), new Path(test, "out"), numMaps, 0);
job.setMapperClass(MyMapper.class);
job.waitForCompletion(true);
assertTrue("Job failed", job.isSuccessful());
TaskReport[] reports = job.getTaskReports(TaskType.MAP);
assertEquals(numMaps, reports.length);
- assertEquals(myStatus + " > sort", reports[0].getState());
+ assertEquals(myStatus, reports[0].getState());
+
+ // test with 1 map and 1 reducer
+ // test with default task status
+ int numReduces = 1;
+ job = MapReduceTestUtil.createJob(createJobConf(),
+ new Path(test, "in"), new Path(test, "out"), numMaps, numReduces);
+ job.setMapperClass(DataCopyMapper.class);
+ job.setReducerClass(DataCopyReducer.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ // fail early
+ job.setMaxMapAttempts(1);
+ job.setMaxReduceAttempts(0);
+
+ // run the job and wait for completion
+ job.waitForCompletion(true);
+ assertTrue("Job failed", job.isSuccessful());
+
+ // check map task reports
+ reports = job.getTaskReports(TaskType.MAP);
+ assertEquals(numMaps, reports.length);
+ assertEquals("map > sort", reports[0].getState());
+
+ // check reduce task reports
+ reports = job.getTaskReports(TaskType.REDUCE);
+ assertEquals(numReduces, reports.length);
+ assertEquals("reduce > reduce", reports[0].getState());
+ }
+
+ // an input with 4 lines
+ private static final String INPUT = "Hi\nHi\nHi\nHi\n";
+ private static final int INPUT_LINES = INPUT.split("\n").length;
+
+ @SuppressWarnings("unchecked")
+ static class ProgressCheckerMapper
+ extends Mapper<LongWritable, Text, Text, Text> {
+ private int recordCount = 0;
+ private float progressRange = 0;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ // check if the map task attempt progress is 0
+ assertEquals("Invalid progress in map setup",
+ 0.0f, context.getProgress(), 0f);
+
+ // define the progress boundaries
+ if (context.getNumReduceTasks() == 0) {
+ progressRange = 1f;
+ } else {
+ progressRange = 0.667f;
+ }
+ }
+
+ @Override
+ protected void map(LongWritable key, Text value,
+ org.apache.hadoop.mapreduce.Mapper.Context context)
+ throws IOException ,InterruptedException {
+ // get the map phase progress
+ float mapPhaseProgress = ((float)++recordCount)/INPUT_LINES;
+ // get the weighted map phase progress
+ float weightedMapProgress = progressRange * mapPhaseProgress;
+ // check the map progress
+ assertEquals("Invalid progress in map",
+ weightedMapProgress, context.getProgress(), 0f);
+
+ context.write(new Text(value.toString() + recordCount), value);
+ };
+
+ protected void cleanup(Mapper.Context context)
+ throws IOException, InterruptedException {
+ // check if the attempt progress is at the progress boundary
+ assertEquals("Invalid progress in map cleanup",
+ progressRange, context.getProgress(), 0f);
+ };
+ }
+
+ /**
+ * Tests new MapReduce map task's context.getProgress() method.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ public void testMapContextProgress()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ int numMaps = 1;
+
+ Path test = new Path(testRootTempDir, "testMapContextProgress");
+
+ Job job = MapReduceTestUtil.createJob(createJobConf(),
+ new Path(test, "in"), new Path(test, "out"), numMaps, 0, INPUT);
+ job.setMapperClass(ProgressCheckerMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+
+ // fail early
+ job.setMaxMapAttempts(1);
+
+ job.waitForCompletion(true);
+ assertTrue("Job failed", job.isSuccessful());
+ }
+
+ @SuppressWarnings("unchecked")
+ static class ProgressCheckerReducer extends Reducer<Text, Text,
+ Text, Text> {
+ private int recordCount = 0;
+ private final float REDUCE_PROGRESS_RANGE = 1.0f/3;
+ private final float SHUFFLE_PROGRESS_RANGE = 1 - REDUCE_PROGRESS_RANGE;
+
+ protected void setup(final Reducer.Context context)
+ throws IOException, InterruptedException {
+ // Note that the reduce will read some segments before calling setup()
+ float reducePhaseProgress = ((float)++recordCount)/INPUT_LINES;
+ float weightedReducePhaseProgress =
+ REDUCE_PROGRESS_RANGE * reducePhaseProgress;
+ // check that the shuffle phase progress is accounted for
+ assertEquals("Invalid progress in reduce setup",
+ SHUFFLE_PROGRESS_RANGE + weightedReducePhaseProgress,
+ context.getProgress(), 0.01f);
+ };
+
+ public void reduce(Text key, Iterator<Text> values, Context context)
+ throws IOException, InterruptedException {
+ float reducePhaseProgress = ((float)++recordCount)/INPUT_LINES;
+ float weightedReducePhaseProgress =
+ REDUCE_PROGRESS_RANGE * reducePhaseProgress;
+ assertEquals("Invalid progress in reduce",
+ SHUFFLE_PROGRESS_RANGE + weightedReducePhaseProgress,
+ context.getProgress(), 0.01f);
+ }
+
+ protected void cleanup(Reducer.Context context)
+ throws IOException, InterruptedException {
+ // check if the reduce task has progress of 1 in the end
+ assertEquals("Invalid progress in reduce cleanup",
+ 1.0f, context.getProgress(), 0f);
+ };
+ }
+
+ /**
+ * Tests new MapReduce reduce task's context.getProgress() method.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void testReduceContextProgress()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ int numTasks = 1;
+ Path test = new Path(testRootTempDir, "testReduceContextProgress");
+
+ Job job = MapReduceTestUtil.createJob(createJobConf(),
+ new Path(test, "in"), new Path(test, "out"), numTasks, numTasks,
+ INPUT);
+ job.setMapperClass(ProgressCheckerMapper.class);
+ job.setReducerClass(ProgressCheckerReducer.class);
+ job.setMapOutputKeyClass(Text.class);
+
+ // fail early
+ job.setMaxMapAttempts(1);
+ job.setMaxReduceAttempts(1);
+
+ job.waitForCompletion(true);
+ assertTrue("Job failed", job.isSuccessful());
}
}