You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/07/22 17:24:49 UTC

svn commit: r796765 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/

Author: ddas
Date: Wed Jul 22 15:24:48 2009
New Revision: 796765

URL: http://svn.apache.org/viewvc?rev=796765&view=rev
Log:
MAPREDUCE-743. Fixes a problem to do with progress reporting in the map phase. Contributed by Ravi Gummadi.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 22 15:24:48 2009
@@ -254,3 +254,5 @@
     MAPREDUCE-682. Removes reservations on tasktrackers which are
     blacklisted. (Sreekanth Ramakrishnan via yhemanth)
 
+    MAPREDUCE-743. Fixes a problem to do with progress reporting
+    in the map phase. (Ravi Gummadi via ddas)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Wed Jul 22 15:24:48 2009
@@ -51,7 +51,7 @@
   private static final Log LOG = 
     LogFactory.getLog(IsolationRunner.class.getName());
 
-  private static class FakeUmbilical implements TaskUmbilicalProtocol {
+  static class FakeUmbilical implements TaskUmbilicalProtocol {
 
     public long getProtocolVersion(String protocol, long clientVersion) {
       return TaskUmbilicalProtocol.versionID;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Jul 22 15:24:48 2009
@@ -56,6 +56,32 @@
     return JobSubmissionProtocol.versionID;
   }
   
+  static RawSplit[] getRawSplits(JobContext jContext, JobConf job)
+      throws Exception {
+    JobConf jobConf = jContext.getJobConf();
+    org.apache.hadoop.mapreduce.InputFormat<?,?> input =
+      ReflectionUtils.newInstance(jContext.getInputFormatClass(), jobConf);
+
+    List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(jContext);
+    RawSplit[] rawSplits = new RawSplit[splits.size()];
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    SerializationFactory factory = new SerializationFactory(jobConf);
+    Serializer serializer = 
+      factory.getSerializer(splits.get(0).getClass());
+    serializer.open(buffer);
+    for (int i = 0; i < splits.size(); i++) {
+      buffer.reset();
+      serializer.serialize(splits.get(i));
+      RawSplit rawSplit = new RawSplit();
+      rawSplit.setClassName(splits.get(i).getClass().getName());
+      rawSplit.setDataLength(splits.get(i).getLength());
+      rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+      rawSplit.setLocations(splits.get(i).getLocations());
+      rawSplits[i] = rawSplit;
+    }
+    return rawSplits;
+  }
+
   private class Job extends Thread
     implements TaskUmbilicalProtocol {
     private Path file;
@@ -115,27 +141,7 @@
         // split input into minimum number of splits
         RawSplit[] rawSplits;
         if (job.getUseNewMapper()) {
-          org.apache.hadoop.mapreduce.InputFormat<?,?> input =
-              ReflectionUtils.newInstance(jContext.getInputFormatClass(), jContext.getJobConf());
-                    
-          List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(jContext);
-          rawSplits = new RawSplit[splits.size()];
-          DataOutputBuffer buffer = new DataOutputBuffer();
-          SerializationFactory factory = new SerializationFactory(conf);
-          Serializer serializer = 
-            factory.getSerializer(splits.get(0).getClass());
-          serializer.open(buffer);
-          for (int i = 0; i < splits.size(); i++) {
-            buffer.reset();
-            serializer.serialize(splits.get(i));
-            RawSplit rawSplit = new RawSplit();
-            rawSplit.setClassName(splits.get(i).getClass().getName());
-            rawSplit.setDataLength(splits.get(i).getLength());
-            rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-            rawSplit.setLocations(splits.get(i).getLocations());
-            rawSplits[i] = rawSplit;
-          }
-
+          rawSplits = getRawSplits(jContext, job);
         } else {
           InputSplit[] splits = job.getInputFormat().getSplits(job, 1);
           rawSplits = new RawSplit[splits.length];

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Jul 22 15:24:48 2009
@@ -188,10 +188,10 @@
      
     protected synchronized boolean moveToNext(K key, V value)
       throws IOException {
-      reporter.setProgress(getProgress());
       beforePos = getPos();
       boolean ret = rawIn.next(key, value);
       afterPos = getPos();
+      reporter.setProgress(getProgress());
       return ret;
     }
     
@@ -285,10 +285,8 @@
       mapPhase = getProgress().addPhase("map", 0.667f);
       sortPhase  = getProgress().addPhase("sort", 0.333f);
     }
-    
-    // start thread that will handle communication with parent
-    TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
-    reporter.startCommunicationThread();
+    TaskReporter reporter = startReporter(umbilical);
+ 
     boolean useNewApi = job.getUseNewMapper();
     initialize(job, getJobID(), reporter, useNewApi);
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Jul 22 15:24:48 2009
@@ -346,8 +346,8 @@
       reducePhase = getProgress().addPhase("reduce");
     }
     // start thread that will handle communication with parent
-    TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
-    reporter.startCommunicationThread();
+    TaskReporter reporter = startReporter(umbilical);
+    
     boolean useNewApi = job.getUseNewReducer();
     initialize(job, getJobID(), reporter, useNewApi);
 
@@ -539,8 +539,7 @@
       }
       public boolean next() throws IOException {
         boolean ret = rawIter.next();
-        reducePhase.set(rawIter.getProgress().getProgress());
-        reporter.progress();
+        reporter.setProgress(rawIter.getProgress().getProgress());
         return ret;
       }
     };

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=796765&r1=796764&r2=796765&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Jul 22 15:24:48 2009
@@ -427,7 +427,9 @@
       setProgressFlag();
     }
     public void setProgress(float progress) {
-      taskProgress.set(progress);
+      // set current phase progress.
+      // This method assumes that task has phases.
+      taskProgress.phase().set(progress);
       // indicate that progress update needs to be sent
       setProgressFlag();
     }
@@ -571,6 +573,16 @@
   }
 
   /**
+   * Create a TaskReporter and start communication thread
+   */
+  TaskReporter startReporter(final TaskUmbilicalProtocol umbilical) {  
+    // start thread that will handle communication with parent
+    TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
+    reporter.startCommunicationThread();
+    return reporter;
+  }
+
+  /**
    * An updater that tracks the last number reported for a given file
    * system and only creates the counters when they are needed.
    */

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java?rev=796765&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java Wed Jul 22 15:24:48 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapred.JobClient.RawSplit;
+
+/**
+ *  Validates map phase progress.
+ *  Testcase uses newApi.
+ *  We extend Task.TaskReporter class and override setProgress()
+ *  to validate the map phase progress being set.
+ *  We extend MapTask and override startReporter() method that creates
+ *  TestTaskReporter instead of TaskReporter and call mapTask.run().
+ *  Similar to LocalJobRunner, we set up splits and call mapTask.run()
+ *  directly. No job is run, only map task is run.
+ *  We use IsolationRunner.FakeUmbilical.
+ *  As the reporter's setProgress() validates progress after
+ *  every record is read, we are done with the validation of map phase progress
+ *  once mapTask.run() is finished. Sort phase progress in map task is not
+ *  validated here.
+ */
+public class TestMapProgress extends TestCase {
+  public static final Log LOG = LogFactory.getLog(TestMapProgress.class);
+  private static String TEST_ROOT_DIR = new File(System.getProperty(
+           "test.build.data", "/tmp")).getAbsolutePath() + "/mapPahseprogress";
+  
+  private FileSystem fs = null;
+  private TestMapTask map = null;
+  private JobID jobId = null;
+  private IsolationRunner.FakeUmbilical fakeUmbilical =
+                                        new IsolationRunner.FakeUmbilical();
+
+  /**
+   *  Task Reporter that validates map phase progress after each record is
+   *  processed by map task
+   */ 
+  public class TestTaskReporter extends Task.TaskReporter {
+    private int recordNum = 0; // number of records processed
+    TestTaskReporter(Task task) {
+      task.super(task.getProgress(), fakeUmbilical);
+    }
+
+    @Override
+    public void setProgress(float progress) {
+      super.setProgress(progress);
+      float mapTaskProgress = map.getProgress().getProgress();
+      LOG.info("Map task progress is " + mapTaskProgress);
+      if (recordNum < 3) {
+        // only 3 records are there; Ignore validating progress after 3 times
+        recordNum++;
+      }
+      else {
+        return;
+      }
+      // validate map task progress when the map task is in map phase
+      assertTrue("Map progress is not the expected value.",
+                 Math.abs(mapTaskProgress - ((0.667/3)*recordNum)) < 0.001);
+    }
+  }
+
+  /**
+   * Map Task that overrides run method and uses TestTaskReporter instead of
+   * TaskReporter and uses FakeUmbilical.
+   */
+  class TestMapTask extends MapTask {
+    public TestMapTask(String jobFile, TaskAttemptID taskId, 
+        int partition, String splitClass, BytesWritable split,
+        int numSlotsRequired) {
+      super(jobFile, taskId, partition, splitClass, split, numSlotsRequired);
+    }
+    
+    /**
+     * Create a TestTaskReporter and use it for validating map phase progress
+     */
+    @Override
+    TaskReporter startReporter(final TaskUmbilicalProtocol umbilical) {  
+      // start thread that will handle communication with parent
+      TaskReporter reporter = new TestTaskReporter(map);
+      return reporter;
+    }
+  }
+  
+  // In the given dir, creates part-0 file with 3 records of same size
+  private void createInputFile(Path rootDir) throws IOException {
+    if(fs.exists(rootDir)){
+      fs.delete(rootDir, true);
+    }
+    
+    String str = "The quick brown fox\n" + "The brown quick fox\n"
+    + "The fox brown quick\n";
+    DataOutputStream inpFile = fs.create(new Path(rootDir, "part-0"));
+    inpFile.writeBytes(str);
+    inpFile.close();
+  }
+
+  /**
+   *  Validates map phase progress after each record is processed by map task
+   *  using custom task reporter.
+   */ 
+  public void testMapProgress() throws Exception {
+    JobConf job = new JobConf();
+    fs = FileSystem.getLocal(job);
+    Path rootDir = new Path(TEST_ROOT_DIR);
+    createInputFile(rootDir);
+
+    job.setNumReduceTasks(0);
+    TaskAttemptID taskId = TaskAttemptID.forName(
+                                  "attempt_200907082313_0424_m_000000_0");
+    job.setClass("mapreduce.outputformat.class",
+                 NullOutputFormat.class, OutputFormat.class);
+    job.set("mapred.input.dir", TEST_ROOT_DIR);
+    jobId = taskId.getJobID();
+    
+    JobContext jContext = new JobContext(job, jobId);
+    RawSplit[] rawSplits = LocalJobRunner.getRawSplits(jContext, job);
+
+    job.setUseNewMapper(true); // use new api
+    for (int i = 0; i < rawSplits.length; i++) {// rawSplits.length is 1
+      map = new TestMapTask(
+          job.get("mapred.system.dir", "/tmp/hadoop/mapred/system") +
+          jobId + "job.xml",  
+          taskId, i,
+          rawSplits[i].getClassName(),
+          rawSplits[i].getBytes(), 1);
+
+      JobConf localConf = new JobConf(job);
+      map.localizeConfiguration(localConf);
+      map.setConf(localConf);
+      map.run(localConf, fakeUmbilical);
+    }
+    // clean up
+    fs.delete(rootDir, true);
+  }
+}