You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:55:41 UTC

svn commit: r1079206 - in /hadoop/mapreduce/branches/yahoo-merge/src: java/org/apache/hadoop/mapred/ test/mapred/org/apache/hadoop/mapred/ test/mapred/org/apache/hadoop/mapreduce/

Author: omalley
Date: Tue Mar  8 05:55:40 2011
New Revision: 1079206

URL: http://svn.apache.org/viewvc?rev=1079206&view=rev
Log:
commit bfdfeb39fbc08b33bc9cb0e805e0c6167a5d7b81
Author: Greg Roelofs <ro...@yahoo-inc.com>
Date:   Wed Dec 8 16:43:20 2010 -0800

    Fix for map-only jobs (commitTask() call), and fixes for 5 of the failing
    unit tests (~30 to go...).  This corresponds to the version 8-delta patch.

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar  8 05:55:40 2011
@@ -884,8 +884,8 @@ abstract public class Task implements Wr
           }
         }
       }
-      //wait for commit approval and commit
-      commit(umbilical, reporter, committer);
+      // wait for commit approval (via JT commitAction for this task) and commit
+      commit(umbilical, reporter, true);
     }
     taskDone.set(true);
     reporter.stopCommunicationThread();
@@ -1004,33 +1004,36 @@ abstract public class Task implements Wr
     }
   }
 
-  private void commit(TaskUmbilicalProtocol umbilical,
-                      TaskReporter reporter,
-                      org.apache.hadoop.mapreduce.OutputCommitter committer
-                      ) throws IOException {
+  // this is protected (rather than private) solely for UberTask map-only case
+  protected void commit(TaskUmbilicalProtocol umbilical,
+                        TaskReporter reporter,
+                        boolean queryTTBeforeCommit) throws IOException {
     int retries = MAX_RETRIES;
-    while (true) {
-      try {
-        while (!umbilical.canCommit(taskIdForUmbilical)) {
-          try {
-            Thread.sleep(1000);
-          } catch(InterruptedException ie) {
-            //ignore
+    if (queryTTBeforeCommit) {
+      while (true) {
+        try {
+          while (!umbilical.canCommit(taskIdForUmbilical)) {
+            try {
+              // FIXME 1:  shouldn't this count down retries, too, in case JT glitched and no longer knows about us?  (else infinite loop)
+              Thread.sleep(1000);  // FIXME 2:  shouldn't hardcoded 1-second sleep instead correspond to heartbeat interval for task?
+            } catch(InterruptedException ie) {
+              //ignore
+            }
+            reporter.setProgressFlag();
+          }
+          break;
+        } catch (IOException ie) {
+          LOG.warn("Failure asking whether task can commit: " +
+              StringUtils.stringifyException(ie));
+          if (--retries == 0) {
+            //if it couldn't query successfully then delete the output
+            discardOutput(taskContext);
+            System.exit(68);
           }
-          reporter.setProgressFlag();
-        }
-        break;
-      } catch (IOException ie) {
-        LOG.warn("Failure asking whether task can commit: " + 
-            StringUtils.stringifyException(ie));
-        if (--retries == 0) {
-          //if it couldn't query successfully then delete the output
-          discardOutput(taskContext);
-          System.exit(68);
         }
       }
     }
-    
+
     // task can Commit now  
     try {
       LOG.info("Task " + taskId + " is allowed to commit now");

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java Tue Mar  8 05:55:40 2011
@@ -291,11 +291,21 @@ class UberTask extends Task {
       // no one calls phase() on parent Progress (or get()?) in interim.
       map.getProgress().complete();
 
-      // Signal the communication thread to pass any progress on up to the TT.
-      // (This and the renameMapOutputForReduce() optimization below are the
-      // sole bits of output committer's "commitTask()" method that we actually
-      // want/need, so consider the subtask committed at this point.)
-      reporter.progress();
+      if (numReduceTasks == 0) {
+        // For map-only jobs, we need to save (commit) each map's output, which
+        // mainly entails asking the TT for permission (in case of speculation)
+        // and then moving it up two subdirectory levels in HDFS (i.e., out
+        // of _temporary/attempt_xxx).  Use UberTask's reporter so we set the
+        // progressFlag to which the communication thread is paying attention.
+        // (It knows nothing about subReporter.)
+        map.commit(umbilical, reporter, false);
+      } else {
+        // For map+reduce or reduce-only jobs, we merely need to signal the
+        // communication thread to pass any progress on up to the TT.  This
+        // and the renameMapOutputForReduce() optimization below are the sole
+        // bits of the commit() method that we actually want/need.
+        reporter.progress();
+      }
 
       // every map will produce file.out (in the same dir), so rename as we go
       // (longer-term, will use TaskAttemptIDs as part of name => avoid rename)

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java Tue Mar  8 05:55:40 2011
@@ -424,26 +424,34 @@ public class TestMiniMRChildTask extends
     //  - append to a new var (NEW_PATH)
     String mapTaskEnvKey = JobConf.MAPRED_MAP_TASK_ENV;
     String reduceTaskEnvKey = JobConf.MAPRED_MAP_TASK_ENV;
+    String uberTaskEnvKey = JobConf.MAPRED_UBER_TASK_ENV;
     String mapTaskJavaOptsKey = JobConf.MAPRED_MAP_TASK_JAVA_OPTS;
     String reduceTaskJavaOptsKey = JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS;
+    String uberTaskJavaOptsKey = JobConf.MAPRED_UBER_TASK_JAVA_OPTS;
     String mapTaskJavaOpts = MAP_OPTS_VAL;
     String reduceTaskJavaOpts = REDUCE_OPTS_VAL;
+    String uberTaskJavaOpts = REDUCE_OPTS_VAL; // larger of MAP and REDUCE VALs
     conf.setBoolean(OLD_CONFIGS, oldConfigs);
     if (oldConfigs) {
       mapTaskEnvKey = reduceTaskEnvKey = JobConf.MAPRED_TASK_ENV;
       mapTaskJavaOptsKey = reduceTaskJavaOptsKey = JobConf.MAPRED_TASK_JAVA_OPTS;
       mapTaskJavaOpts = reduceTaskJavaOpts = TASK_OPTS_VAL;
     }
-    conf.set(mapTaskEnvKey, 
+    conf.set(mapTaskEnvKey,
              "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
              "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
-    conf.set(reduceTaskEnvKey, 
+    conf.set(reduceTaskEnvKey,
+             "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
+             "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
+    conf.set(uberTaskEnvKey,
              "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
              "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
     conf.set("path", System.getenv("PATH"));
     conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts);
     conf.set(reduceTaskJavaOptsKey, reduceTaskJavaOpts);
+    conf.set(uberTaskJavaOptsKey, uberTaskJavaOpts);
     RunningJob job = JobClient.runJob(conf);
+    // this is always true since runJob() throws if !job.isSuccessful() :
     assertTrue("The environment checker job failed.", job.isSuccessful());
   }
   

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Tue Mar  8 05:55:40 2011
@@ -94,6 +94,7 @@ public class TestMiniMRDFSSort extends T
     job.setInt(JobContext.JVM_NUMTASKS_TORUN, -1);
     job.setInt(JobContext.IO_SORT_MB, 1);
     job.setNumMapTasks(12);
+    job.setBoolean(JobContext.JOB_UBERTASK_ENABLE, false);
 
     // Setup command-line arguments to 'sort'
     String[] sortArgs = {sortInput.toString(), sortOutput.toString()};

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java Tue Mar  8 05:55:40 2011
@@ -40,11 +40,12 @@ public class TestReduceFetch extends Tes
     job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.05");
     job.setInt(JobContext.IO_SORT_FACTOR, 2);
     job.setInt(JobContext.REDUCE_MERGE_INMEM_THRESHOLD, 4);
+    job.setBoolean(JobContext.JOB_UBERTASK_ENABLE, false);
     Counters c = runJob(job);
     final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
     final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
-    assertTrue("Expected all records spilled during reduce (" + spill + ")",
-        spill >= 2 * out); // all records spill at map, reduce
+    assertTrue("Expected all records spilled during reduce (" + spill + " >= 2*"
+        + out + ")", spill >= 2 * out); // all records spill at map, reduce
     assertTrue("Expected intermediate merges (" + spill + ")",
         spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice
   }
@@ -60,6 +61,7 @@ public class TestReduceFetch extends Tes
     job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "1.0");
     job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
     job.setNumMapTasks(MAP_TASKS);
+    job.setBoolean(JobContext.JOB_UBERTASK_ENABLE, false);
     Counters c = runJob(job);
     final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
     final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java Tue Mar  8 05:55:40 2011
@@ -89,11 +89,12 @@ public class TestReduceFetchFromPartialM
     job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
     job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.14");
     job.set(JobContext.SHUFFLE_MERGE_EPRCENT, "1.0");
+    job.setBoolean(JobContext.JOB_UBERTASK_ENABLE, false);
     Counters c = runJob(job);
     final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
     final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
-    assertTrue("Expected some records not spilled during reduce" + spill + ")",
-        spill < 2 * out); // spilled map records, some records at the reduce
+    assertTrue("Expected some records not spilled during reduce (" + spill +
+        " < 2*" + out + ")", spill < 2 * out); // spilled map records, some records at the reduce
   }
 
   /**

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Tue Mar  8 05:55:40 2011
@@ -138,13 +138,15 @@ public class TestMapReduceLocal extends 
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(IntWritable.class);
     job.setInputFormatClass(TrackingTextInputFormat.class);
+    conf.setBoolean(JobContext.JOB_UBERTASK_ENABLE, false);
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
     assertTrue(job.waitForCompletion(false));
     String out = readFile("out/part-r-00000");
     System.out.println(out);
-    assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n",
-                 out);
+    assertEquals(
+        "a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n",
+        out);
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
     long mapIn = ctrs.findCounter(FileInputFormat.COUNTER_GROUP, 
@@ -170,8 +172,8 @@ public class TestMapReduceLocal extends 
     assertEquals("combine out = reduce in", combineOut, reduceIn);
     assertTrue("combine in > combine out", combineIn > combineOut);
     assertEquals("reduce groups = reduce out", reduceGrps, reduceOut);
-    assertEquals("Mismatch in mergedMapOutputs", mergedMapOutputs, 2);
-    assertEquals("Mismatch in shuffledMaps", shuffledMaps, 2);
+    assertEquals("Mismatch in mergedMapOutputs", 2, mergedMapOutputs);
+    assertEquals("Mismatch in shuffledMaps", 2, shuffledMaps);
     String group = "Random Group";
     CounterGroup ctrGrp = ctrs.getGroup(group);
     assertEquals(0, ctrGrp.size());