You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/07/22 17:24:49 UTC
svn commit: r796765 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Author: ddas
Date: Wed Jul 22 15:24:48 2009
New Revision: 796765
URL: http://svn.apache.org/viewvc?rev=796765&view=rev
Log:
MAPREDUCE-743. Fixes a problem to do with progress reporting in the map phase. Contributed by Ravi Gummadi.
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 22 15:24:48 2009
@@ -254,3 +254,5 @@
MAPREDUCE-682. Removes reservations on tasktrackers which are
blacklisted. (Sreekanth Ramakrishnan via yhemanth)
+ MAPREDUCE-743. Fixes a problem to do with progress reporting
+ in the map phase. (Ravi Gummadi via ddas)
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Wed Jul 22 15:24:48 2009
@@ -51,7 +51,7 @@
private static final Log LOG =
LogFactory.getLog(IsolationRunner.class.getName());
- private static class FakeUmbilical implements TaskUmbilicalProtocol {
+ static class FakeUmbilical implements TaskUmbilicalProtocol {
public long getProtocolVersion(String protocol, long clientVersion) {
return TaskUmbilicalProtocol.versionID;
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Jul 22 15:24:48 2009
@@ -56,6 +56,32 @@
return JobSubmissionProtocol.versionID;
}
+ static RawSplit[] getRawSplits(JobContext jContext, JobConf job)
+ throws Exception {
+ JobConf jobConf = jContext.getJobConf();
+ org.apache.hadoop.mapreduce.InputFormat<?,?> input =
+ ReflectionUtils.newInstance(jContext.getInputFormatClass(), jobConf);
+
+ List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(jContext);
+ RawSplit[] rawSplits = new RawSplit[splits.size()];
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ SerializationFactory factory = new SerializationFactory(jobConf);
+ Serializer serializer =
+ factory.getSerializer(splits.get(0).getClass());
+ serializer.open(buffer);
+ for (int i = 0; i < splits.size(); i++) {
+ buffer.reset();
+ serializer.serialize(splits.get(i));
+ RawSplit rawSplit = new RawSplit();
+ rawSplit.setClassName(splits.get(i).getClass().getName());
+ rawSplit.setDataLength(splits.get(i).getLength());
+ rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+ rawSplit.setLocations(splits.get(i).getLocations());
+ rawSplits[i] = rawSplit;
+ }
+ return rawSplits;
+ }
+
private class Job extends Thread
implements TaskUmbilicalProtocol {
private Path file;
@@ -115,27 +141,7 @@
// split input into minimum number of splits
RawSplit[] rawSplits;
if (job.getUseNewMapper()) {
- org.apache.hadoop.mapreduce.InputFormat<?,?> input =
- ReflectionUtils.newInstance(jContext.getInputFormatClass(), jContext.getJobConf());
-
- List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(jContext);
- rawSplits = new RawSplit[splits.size()];
- DataOutputBuffer buffer = new DataOutputBuffer();
- SerializationFactory factory = new SerializationFactory(conf);
- Serializer serializer =
- factory.getSerializer(splits.get(0).getClass());
- serializer.open(buffer);
- for (int i = 0; i < splits.size(); i++) {
- buffer.reset();
- serializer.serialize(splits.get(i));
- RawSplit rawSplit = new RawSplit();
- rawSplit.setClassName(splits.get(i).getClass().getName());
- rawSplit.setDataLength(splits.get(i).getLength());
- rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
- rawSplit.setLocations(splits.get(i).getLocations());
- rawSplits[i] = rawSplit;
- }
-
+ rawSplits = getRawSplits(jContext, job);
} else {
InputSplit[] splits = job.getInputFormat().getSplits(job, 1);
rawSplits = new RawSplit[splits.length];
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Jul 22 15:24:48 2009
@@ -188,10 +188,10 @@
protected synchronized boolean moveToNext(K key, V value)
throws IOException {
- reporter.setProgress(getProgress());
beforePos = getPos();
boolean ret = rawIn.next(key, value);
afterPos = getPos();
+ reporter.setProgress(getProgress());
return ret;
}
@@ -285,10 +285,8 @@
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
-
- // start thread that will handle communication with parent
- TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
- reporter.startCommunicationThread();
+ TaskReporter reporter = startReporter(umbilical);
+
boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Jul 22 15:24:48 2009
@@ -346,8 +346,8 @@
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
- TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
- reporter.startCommunicationThread();
+ TaskReporter reporter = startReporter(umbilical);
+
boolean useNewApi = job.getUseNewReducer();
initialize(job, getJobID(), reporter, useNewApi);
@@ -539,8 +539,7 @@
}
public boolean next() throws IOException {
boolean ret = rawIter.next();
- reducePhase.set(rawIter.getProgress().getProgress());
- reporter.progress();
+ reporter.setProgress(rawIter.getProgress().getProgress());
return ret;
}
};
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Jul 22 15:24:48 2009
@@ -427,7 +427,9 @@
setProgressFlag();
}
public void setProgress(float progress) {
- taskProgress.set(progress);
+ // set current phase progress.
+ // This method assumes that task has phases.
+ taskProgress.phase().set(progress);
// indicate that progress update needs to be sent
setProgressFlag();
}
@@ -571,6 +573,16 @@
}
/**
+ * Create a TaskReporter and start communication thread
+ */
+ TaskReporter startReporter(final TaskUmbilicalProtocol umbilical) {
+ // start thread that will handle communication with parent
+ TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
+ reporter.startCommunicationThread();
+ return reporter;
+ }
+
+ /**
* An updater that tracks the last number reported for a given file
* system and only creates the counters when they are needed.
*/
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java?rev=796765&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java Wed Jul 22 15:24:48 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapred.JobClient.RawSplit;
+
+/**
+ * Validates map phase progress.
+ * Testcase uses newApi.
+ * We extend Task.TaskReporter class and override setProgress()
+ * to validate the map phase progress being set.
+ * We extend MapTask and override startReporter() method that creates
+ * TestTaskReporter instead of TaskReporter and call mapTask.run().
+ * Similar to LocalJobRunner, we set up splits and call mapTask.run()
+ * directly. No job is run, only map task is run.
+ * We use IsolationRunner.FakeUmbilical.
+ * As the reporter's setProgress() validates progress after
+ * every record is read, we are done with the validation of map phase progress
+ * once mapTask.run() is finished. Sort phase progress in map task is not
+ * validated here.
+ */
+public class TestMapProgress extends TestCase {
+ public static final Log LOG = LogFactory.getLog(TestMapProgress.class);
+ private static String TEST_ROOT_DIR = new File(System.getProperty(
+ "test.build.data", "/tmp")).getAbsolutePath() + "/mapPahseprogress";
+
+ private FileSystem fs = null;
+ private TestMapTask map = null;
+ private JobID jobId = null;
+ private IsolationRunner.FakeUmbilical fakeUmbilical =
+ new IsolationRunner.FakeUmbilical();
+
+ /**
+ * Task Reporter that validates map phase progress after each record is
+ * processed by map task
+ */
+ public class TestTaskReporter extends Task.TaskReporter {
+ private int recordNum = 0; // number of records processed
+ TestTaskReporter(Task task) {
+ task.super(task.getProgress(), fakeUmbilical);
+ }
+
+ @Override
+ public void setProgress(float progress) {
+ super.setProgress(progress);
+ float mapTaskProgress = map.getProgress().getProgress();
+ LOG.info("Map task progress is " + mapTaskProgress);
+ if (recordNum < 3) {
+ // only 3 records are there; Ignore validating progress after 3 times
+ recordNum++;
+ }
+ else {
+ return;
+ }
+ // validate map task progress when the map task is in map phase
+ assertTrue("Map progress is not the expected value.",
+ Math.abs(mapTaskProgress - ((0.667/3)*recordNum)) < 0.001);
+ }
+ }
+
+ /**
+ * Map Task that overrides run method and uses TestTaskReporter instead of
+ * TaskReporter and uses FakeUmbilical.
+ */
+ class TestMapTask extends MapTask {
+ public TestMapTask(String jobFile, TaskAttemptID taskId,
+ int partition, String splitClass, BytesWritable split,
+ int numSlotsRequired) {
+ super(jobFile, taskId, partition, splitClass, split, numSlotsRequired);
+ }
+
+ /**
+ * Create a TestTaskReporter and use it for validating map phase progress
+ */
+ @Override
+ TaskReporter startReporter(final TaskUmbilicalProtocol umbilical) {
+ // start thread that will handle communication with parent
+ TaskReporter reporter = new TestTaskReporter(map);
+ return reporter;
+ }
+ }
+
+ // In the given dir, creates part-0 file with 3 records of same size
+ private void createInputFile(Path rootDir) throws IOException {
+ if(fs.exists(rootDir)){
+ fs.delete(rootDir, true);
+ }
+
+ String str = "The quick brown fox\n" + "The brown quick fox\n"
+ + "The fox brown quick\n";
+ DataOutputStream inpFile = fs.create(new Path(rootDir, "part-0"));
+ inpFile.writeBytes(str);
+ inpFile.close();
+ }
+
+ /**
+ * Validates map phase progress after each record is processed by map task
+ * using custom task reporter.
+ */
+ public void testMapProgress() throws Exception {
+ JobConf job = new JobConf();
+ fs = FileSystem.getLocal(job);
+ Path rootDir = new Path(TEST_ROOT_DIR);
+ createInputFile(rootDir);
+
+ job.setNumReduceTasks(0);
+ TaskAttemptID taskId = TaskAttemptID.forName(
+ "attempt_200907082313_0424_m_000000_0");
+ job.setClass("mapreduce.outputformat.class",
+ NullOutputFormat.class, OutputFormat.class);
+ job.set("mapred.input.dir", TEST_ROOT_DIR);
+ jobId = taskId.getJobID();
+
+ JobContext jContext = new JobContext(job, jobId);
+ RawSplit[] rawSplits = LocalJobRunner.getRawSplits(jContext, job);
+
+ job.setUseNewMapper(true); // use new api
+ for (int i = 0; i < rawSplits.length; i++) {// rawSplits.length is 1
+ map = new TestMapTask(
+ job.get("mapred.system.dir", "/tmp/hadoop/mapred/system") +
+ jobId + "job.xml",
+ taskId, i,
+ rawSplits[i].getClassName(),
+ rawSplits[i].getBytes(), 1);
+
+ JobConf localConf = new JobConf(job);
+ map.localizeConfiguration(localConf);
+ map.setConf(localConf);
+ map.run(localConf, fakeUmbilical);
+ }
+ // clean up
+ fs.delete(rootDir, true);
+ }
+}