You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:09:46 UTC
svn commit: r1077385 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/
test/org/apache/hadoop/mapreduce/
Author: omalley
Date: Fri Mar 4 04:09:46 2011
New Revision: 1077385
URL: http://svn.apache.org/viewvc?rev=1077385&view=rev
Log:
commit 4a02fcddfee9e690371fad590b9f23e5140602ac
Author: Vinod Kumar <vi...@yahoo-inc.com>
Date: Mon Apr 12 11:10:09 2010 +0530
MAPREDUCE-1635 from https://issues.apache.org/jira/secure/attachment/12441448/patch-1635-ydist.txt
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskOutputSize.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java?rev=1077385&r1=1077384&r2=1077385&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java Fri Mar 4 04:09:46 2011
@@ -26,6 +26,12 @@ import org.apache.hadoop.fs.Path;
/**
* Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
*/
class MapOutputFile {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077385&r1=1077384&r2=1077385&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar 4 04:09:46 2011
@@ -805,8 +805,12 @@ abstract public class Task implements Wr
}
}
+ /**
+ * Sends last status update before sending umbilical.done();
+ */
private void sendLastUpdate(TaskUmbilicalProtocol umbilical)
throws IOException {
+ taskStatus.setOutputSize(calculateOutputSize());
// send a final status report
taskStatus.statusUpdate(taskProgress.get(),
taskProgress.toString(),
@@ -814,6 +818,28 @@ abstract public class Task implements Wr
statusUpdate(umbilical);
}
+ /**
+ * Calculates the size of output for this task.
+ *
+ * @return -1 if it can't be found.
+ */
+ private long calculateOutputSize() throws IOException {
+ if (!isMapOrReduce()) {
+ return -1;
+ }
+
+ if (isMapTask() && conf.getNumReduceTasks() > 0) {
+ try {
+ Path mapOutput = mapOutputFile.getOutputFile();
+ FileSystem localFS = FileSystem.getLocal(conf);
+ return localFS.getFileStatus(mapOutput).getLen();
+ } catch (IOException e) {
+ LOG.warn ("Could not find output size " , e);
+ }
+ }
+ return -1;
+ }
+
private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
int retries = MAX_RETRIES;
while (true) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=1077385&r1=1077384&r2=1077385&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Fri Mar 4 04:09:46 2011
@@ -54,7 +54,7 @@ public abstract class TaskStatus impleme
private long startTime;
private long finishTime;
- private long outputSize;
+ private long outputSize = -1L;
private volatile Phase phase = Phase.STARTING;
private Counters counters;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077385&r1=1077384&r2=1077385&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 04:09:46 2011
@@ -1948,44 +1948,6 @@ public class TaskTracker
return biggestSeenSoFar;
}
- /**
- * Try to get the size of output for this task.
- * Returns -1 if it can't be found.
- * @return
- */
- long tryToGetOutputSize(TaskAttemptID taskId, JobConf conf) {
-
- try{
- TaskInProgress tip;
- synchronized(this) {
- tip = tasks.get(taskId);
- }
- if(tip == null)
- return -1;
-
- if (!tip.getTask().isMapTask() ||
- tip.getRunState() != TaskStatus.State.SUCCEEDED) {
- return -1;
- }
-
- MapOutputFile mapOutputFile = new MapOutputFile();
- mapOutputFile.setConf(conf);
-
- Path tmp_output = mapOutputFile.getOutputFile();
- if(tmp_output == null)
- return 0;
- FileSystem localFS = FileSystem.getLocal(conf);
- FileStatus stat = localFS.getFileStatus(tmp_output);
- if(stat == null)
- return 0;
- else
- return stat.getLen();
- } catch(IOException e) {
- LOG.info(e);
- return -1;
- }
- }
-
private TaskLauncher mapLauncher;
private TaskLauncher reduceLauncher;
public JvmManager getJvmManagerInstance() {
@@ -3268,7 +3230,6 @@ public class TaskTracker
for(TaskInProgress tip: runningTasks.values()) {
TaskStatus status = tip.getStatus();
status.setIncludeCounters(sendCounters);
- status.setOutputSize(tryToGetOutputSize(status.getTaskID(), fConf));
// send counters for finished or failed tasks and commit pending tasks
if (status.getRunState() != TaskStatus.State.RUNNING) {
status.setIncludeCounters(true);
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskOutputSize.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskOutputSize.java?rev=1077385&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskOutputSize.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskOutputSize.java Fri Mar 4 04:09:46 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.junit.After;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestTaskOutputSize {
+ private static Path rootDir = new Path(System.getProperty("test.build.data",
+ "/tmp"), "test");
+
+ @After
+ public void tearDown() throws Exception {
+ FileUtil.fullyDelete(new File(rootDir.toString()));
+ }
+
+ @Test
+ public void testTaskOutputSize() throws Exception {
+ MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
+ Path inDir = new Path(rootDir, "input");
+ Path outDir = new Path(rootDir, "output");
+ Job job = MapReduceTestUtil.createJob(mr.createJobConf(), inDir, outDir, 1, 1);
+ job.waitForCompletion(true);
+ assertTrue("Job failed", job.isSuccessful());
+ JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+ for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+ TaskStatus ts = jt.getTaskStatus(tce.getTaskAttemptId());
+ if (tce.isMapTask()) {
+ assertTrue(
+ "map output size is not found for " + tce.getTaskAttemptId(), ts
+ .getOutputSize() > 0);
+ } else {
+ assertEquals("task output size not expected for "
+ + tce.getTaskAttemptId(), -1, ts.getOutputSize());
+ }
+ }
+
+ // test output sizes for job with no reduces
+ job = MapReduceTestUtil.createJob(mr.createJobConf(), inDir, outDir, 1, 0);
+ job.waitForCompletion(true);
+ assertTrue("Job failed", job.isSuccessful());
+ for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+ TaskStatus ts = jt.getTaskStatus(tce.getTaskAttemptId());
+ assertEquals("task output size not expected for "
+ + tce.getTaskAttemptId(), -1, ts.getOutputSize());
+ }
+
+ // test output sizes for failed job
+ job = MapReduceTestUtil.createFailJob(mr.createJobConf(), outDir, inDir);
+ job.waitForCompletion(true);
+ assertFalse("Job not failed", job.isSuccessful());
+ for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+ TaskStatus ts = jt.getTaskStatus(tce.getTaskAttemptId());
+ assertEquals("task output size not expected for "
+ + tce.getTaskAttemptId(), -1, ts.getOutputSize());
+ }
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=1077385&r1=1077384&r2=1077385&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Fri Mar 4 04:09:46 2011
@@ -152,6 +152,10 @@ public class MapReduceTestUtil {
public static Job createFailJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
+ FileSystem fs = outdir.getFileSystem(conf);
+ if (fs.exists(outdir)) {
+ fs.delete(outdir, true);
+ }
conf.setInt("mapred.map.max.attempts", 2);
Job theJob = new Job(conf);
theJob.setJobName("Fail-Job");