You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:09:46 UTC

svn commit: r1077385 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapreduce/

Author: omalley
Date: Fri Mar  4 04:09:46 2011
New Revision: 1077385

URL: http://svn.apache.org/viewvc?rev=1077385&view=rev
Log:
commit 4a02fcddfee9e690371fad590b9f23e5140602ac
Author: Vinod Kumar <vi...@yahoo-inc.com>
Date:   Mon Apr 12 11:10:09 2010 +0530

    MAPREDUCE-1635 from https://issues.apache.org/jira/secure/attachment/12441448/patch-1635-ydist.txt

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskOutputSize.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java?rev=1077385&r1=1077384&r2=1077385&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java Fri Mar  4 04:09:46 2011
@@ -26,6 +26,12 @@ import org.apache.hadoop.fs.Path;
 
 /**
  * Manipulate the working area for the transient store for maps and reduces.
+ * 
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of 
+ * these methods are from child space and see mapreduce.cluster.local.dir as 
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
  */ 
 class MapOutputFile {
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077385&r1=1077384&r2=1077385&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 04:09:46 2011
@@ -805,8 +805,12 @@ abstract public class Task implements Wr
     }
   }
   
+  /**
+   * Sends last status update before sending umbilical.done(); 
+   */
   private void sendLastUpdate(TaskUmbilicalProtocol umbilical) 
   throws IOException {
+    taskStatus.setOutputSize(calculateOutputSize());
     // send a final status report
     taskStatus.statusUpdate(taskProgress.get(),
                             taskProgress.toString(), 
@@ -814,6 +818,28 @@ abstract public class Task implements Wr
     statusUpdate(umbilical);
   }
 
+  /**
+   * Calculates the size of output for this task.
+   * 
+   * @return -1 if it can't be found.
+   */
+   private long calculateOutputSize() throws IOException {
+    if (!isMapOrReduce()) {
+      return -1;
+    }
+
+    if (isMapTask() && conf.getNumReduceTasks() > 0) {
+      try {
+        Path mapOutput =  mapOutputFile.getOutputFile();
+        FileSystem localFS = FileSystem.getLocal(conf);
+        return localFS.getFileStatus(mapOutput).getLen();
+      } catch (IOException e) {
+        LOG.warn ("Could not find output size " , e);
+      }
+    }
+    return -1;
+  }
+
   private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
     int retries = MAX_RETRIES;
     while (true) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=1077385&r1=1077384&r2=1077385&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Fri Mar  4 04:09:46 2011
@@ -54,7 +54,7 @@ public abstract class TaskStatus impleme
     
   private long startTime; 
   private long finishTime; 
-  private long outputSize;
+  private long outputSize = -1L;
     
   private volatile Phase phase = Phase.STARTING; 
   private Counters counters;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077385&r1=1077384&r2=1077385&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 04:09:46 2011
@@ -1948,44 +1948,6 @@ public class TaskTracker 
     return biggestSeenSoFar;
   }
     
-  /**
-   * Try to get the size of output for this task.
-   * Returns -1 if it can't be found.
-   * @return
-   */
-  long tryToGetOutputSize(TaskAttemptID taskId, JobConf conf) {
-    
-    try{
-      TaskInProgress tip;
-      synchronized(this) {
-        tip = tasks.get(taskId);
-      }
-      if(tip == null)
-         return -1;
-      
-      if (!tip.getTask().isMapTask() || 
-          tip.getRunState() != TaskStatus.State.SUCCEEDED) {
-        return -1;
-      }
-      
-      MapOutputFile mapOutputFile = new MapOutputFile();
-      mapOutputFile.setConf(conf);
-      
-      Path tmp_output =  mapOutputFile.getOutputFile();
-      if(tmp_output == null)
-        return 0;
-      FileSystem localFS = FileSystem.getLocal(conf);
-      FileStatus stat = localFS.getFileStatus(tmp_output);
-      if(stat == null)
-        return 0;
-      else
-        return stat.getLen();
-    } catch(IOException e) {
-      LOG.info(e);
-      return -1;
-    }
-  }
-
   private TaskLauncher mapLauncher;
   private TaskLauncher reduceLauncher;
   public JvmManager getJvmManagerInstance() {
@@ -3268,7 +3230,6 @@ public class TaskTracker 
     for(TaskInProgress tip: runningTasks.values()) {
       TaskStatus status = tip.getStatus();
       status.setIncludeCounters(sendCounters);
-      status.setOutputSize(tryToGetOutputSize(status.getTaskID(), fConf));
       // send counters for finished or failed tasks and commit pending tasks
       if (status.getRunState() != TaskStatus.State.RUNNING) {
         status.setIncludeCounters(true);

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskOutputSize.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskOutputSize.java?rev=1077385&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskOutputSize.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskOutputSize.java Fri Mar  4 04:09:46 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.junit.After;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestTaskOutputSize {
+  private static Path rootDir = new Path(System.getProperty("test.build.data",
+      "/tmp"), "test");
+
+  @After
+  public void tearDown() throws Exception {
+    FileUtil.fullyDelete(new File(rootDir.toString()));
+  }
+
+  @Test
+  public void testTaskOutputSize() throws Exception {
+    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
+    Path inDir = new Path(rootDir, "input");
+    Path outDir = new Path(rootDir, "output");
+    Job job = MapReduceTestUtil.createJob(mr.createJobConf(), inDir, outDir, 1, 1);
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+    for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+      TaskStatus ts = jt.getTaskStatus(tce.getTaskAttemptId());
+      if (tce.isMapTask()) {
+        assertTrue(
+            "map output size is not found for " + tce.getTaskAttemptId(), ts
+                .getOutputSize() > 0);
+      } else {
+        assertEquals("task output size not expected for "
+            + tce.getTaskAttemptId(), -1, ts.getOutputSize());
+      }
+    }
+
+    // test output sizes for job with no reduces
+    job = MapReduceTestUtil.createJob(mr.createJobConf(), inDir, outDir, 1, 0);
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+    for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+      TaskStatus ts = jt.getTaskStatus(tce.getTaskAttemptId());
+      assertEquals("task output size not expected for "
+          + tce.getTaskAttemptId(), -1, ts.getOutputSize());
+    }
+
+    // test output sizes for failed job
+    job = MapReduceTestUtil.createFailJob(mr.createJobConf(), outDir, inDir);
+    job.waitForCompletion(true);
+    assertFalse("Job not failed", job.isSuccessful());
+    for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+      TaskStatus ts = jt.getTaskStatus(tce.getTaskAttemptId());
+      assertEquals("task output size not expected for "
+          + tce.getTaskAttemptId(), -1, ts.getOutputSize());
+    }
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=1077385&r1=1077384&r2=1077385&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Fri Mar  4 04:09:46 2011
@@ -152,6 +152,10 @@ public class MapReduceTestUtil {
   public static Job createFailJob(Configuration conf, Path outdir, 
       Path... indirs) throws Exception {
 
+    FileSystem fs = outdir.getFileSystem(conf);
+    if (fs.exists(outdir)) {
+      fs.delete(outdir, true);
+    }
     conf.setInt("mapred.map.max.attempts", 2);
     Job theJob = new Job(conf);
     theJob.setJobName("Fail-Job");