You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:55:41 UTC
svn commit: r1079206 - in /hadoop/mapreduce/branches/yahoo-merge/src:
java/org/apache/hadoop/mapred/ test/mapred/org/apache/hadoop/mapred/
test/mapred/org/apache/hadoop/mapreduce/
Author: omalley
Date: Tue Mar 8 05:55:40 2011
New Revision: 1079206
URL: http://svn.apache.org/viewvc?rev=1079206&view=rev
Log:
commit bfdfeb39fbc08b33bc9cb0e805e0c6167a5d7b81
Author: Greg Roelofs <ro...@yahoo-inc.com>
Date: Wed Dec 8 16:43:20 2010 -0800
Fix for map-only jobs (commitTask() call), and fixes for 5 of the failing
unit tests (~30 to go...). This corresponds to the version 8-delta patch.
Modified:
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar 8 05:55:40 2011
@@ -884,8 +884,8 @@ abstract public class Task implements Wr
}
}
}
- //wait for commit approval and commit
- commit(umbilical, reporter, committer);
+ // wait for commit approval (via JT commitAction for this task) and commit
+ commit(umbilical, reporter, true);
}
taskDone.set(true);
reporter.stopCommunicationThread();
@@ -1004,33 +1004,36 @@ abstract public class Task implements Wr
}
}
- private void commit(TaskUmbilicalProtocol umbilical,
- TaskReporter reporter,
- org.apache.hadoop.mapreduce.OutputCommitter committer
- ) throws IOException {
+ // this is protected (rather than private) solely for UberTask map-only case
+ protected void commit(TaskUmbilicalProtocol umbilical,
+ TaskReporter reporter,
+ boolean queryTTBeforeCommit) throws IOException {
int retries = MAX_RETRIES;
- while (true) {
- try {
- while (!umbilical.canCommit(taskIdForUmbilical)) {
- try {
- Thread.sleep(1000);
- } catch(InterruptedException ie) {
- //ignore
+ if (queryTTBeforeCommit) {
+ while (true) {
+ try {
+ while (!umbilical.canCommit(taskIdForUmbilical)) {
+ try {
+ // FIXME 1: shouldn't this count down retries, too, in case JT glitched and no longer knows about us? (else infinite loop)
+ Thread.sleep(1000); // FIXME 2: shouldn't hardcoded 1-second sleep instead correspond to heartbeat interval for task?
+ } catch(InterruptedException ie) {
+ //ignore
+ }
+ reporter.setProgressFlag();
+ }
+ break;
+ } catch (IOException ie) {
+ LOG.warn("Failure asking whether task can commit: " +
+ StringUtils.stringifyException(ie));
+ if (--retries == 0) {
+ //if it couldn't query successfully then delete the output
+ discardOutput(taskContext);
+ System.exit(68);
}
- reporter.setProgressFlag();
- }
- break;
- } catch (IOException ie) {
- LOG.warn("Failure asking whether task can commit: " +
- StringUtils.stringifyException(ie));
- if (--retries == 0) {
- //if it couldn't query successfully then delete the output
- discardOutput(taskContext);
- System.exit(68);
}
}
}
-
+
// task can Commit now
try {
LOG.info("Task " + taskId + " is allowed to commit now");
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java Tue Mar 8 05:55:40 2011
@@ -291,11 +291,21 @@ class UberTask extends Task {
// no one calls phase() on parent Progress (or get()?) in interim.
map.getProgress().complete();
- // Signal the communication thread to pass any progress on up to the TT.
- // (This and the renameMapOutputForReduce() optimization below are the
- // sole bits of output committer's "commitTask()" method that we actually
- // want/need, so consider the subtask committed at this point.)
- reporter.progress();
+ if (numReduceTasks == 0) {
+ // For map-only jobs, we need to save (commit) each map's output, which
+ // mainly entails asking the TT for permission (in case of speculation)
+ // and then moving it up two subdirectory levels in HDFS (i.e., out
+ // of _temporary/attempt_xxx). Use UberTask's reporter so we set the
+ // progressFlag to which the communication thread is paying attention.
+ // (It knows nothing about subReporter.)
+ map.commit(umbilical, reporter, false);
+ } else {
+ // For map+reduce or reduce-only jobs, we merely need to signal the
+ // communication thread to pass any progress on up to the TT. This
+ // and the renameMapOutputForReduce() optimization below are the sole
+ // bits of the commit() method that we actually want/need.
+ reporter.progress();
+ }
// every map will produce file.out (in the same dir), so rename as we go
// (longer-term, will use TaskAttemptIDs as part of name => avoid rename)
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java Tue Mar 8 05:55:40 2011
@@ -424,26 +424,34 @@ public class TestMiniMRChildTask extends
// - append to a new var (NEW_PATH)
String mapTaskEnvKey = JobConf.MAPRED_MAP_TASK_ENV;
String reduceTaskEnvKey = JobConf.MAPRED_MAP_TASK_ENV;
+ String uberTaskEnvKey = JobConf.MAPRED_UBER_TASK_ENV;
String mapTaskJavaOptsKey = JobConf.MAPRED_MAP_TASK_JAVA_OPTS;
String reduceTaskJavaOptsKey = JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS;
+ String uberTaskJavaOptsKey = JobConf.MAPRED_UBER_TASK_JAVA_OPTS;
String mapTaskJavaOpts = MAP_OPTS_VAL;
String reduceTaskJavaOpts = REDUCE_OPTS_VAL;
+ String uberTaskJavaOpts = REDUCE_OPTS_VAL; // larger of MAP and REDUCE VALs
conf.setBoolean(OLD_CONFIGS, oldConfigs);
if (oldConfigs) {
mapTaskEnvKey = reduceTaskEnvKey = JobConf.MAPRED_TASK_ENV;
mapTaskJavaOptsKey = reduceTaskJavaOptsKey = JobConf.MAPRED_TASK_JAVA_OPTS;
mapTaskJavaOpts = reduceTaskJavaOpts = TASK_OPTS_VAL;
}
- conf.set(mapTaskEnvKey,
+ conf.set(mapTaskEnvKey,
"MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
"PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
- conf.set(reduceTaskEnvKey,
+ conf.set(reduceTaskEnvKey,
+ "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
+ "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
+ conf.set(uberTaskEnvKey,
"MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," +
"PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
conf.set("path", System.getenv("PATH"));
conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts);
conf.set(reduceTaskJavaOptsKey, reduceTaskJavaOpts);
+ conf.set(uberTaskJavaOptsKey, uberTaskJavaOpts);
RunningJob job = JobClient.runJob(conf);
+ // this is always true since runJob() throws if !job.isSuccessful() :
assertTrue("The environment checker job failed.", job.isSuccessful());
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Tue Mar 8 05:55:40 2011
@@ -94,6 +94,7 @@ public class TestMiniMRDFSSort extends T
job.setInt(JobContext.JVM_NUMTASKS_TORUN, -1);
job.setInt(JobContext.IO_SORT_MB, 1);
job.setNumMapTasks(12);
+ job.setBoolean(JobContext.JOB_UBERTASK_ENABLE, false);
// Setup command-line arguments to 'sort'
String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java Tue Mar 8 05:55:40 2011
@@ -40,11 +40,12 @@ public class TestReduceFetch extends Tes
job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.05");
job.setInt(JobContext.IO_SORT_FACTOR, 2);
job.setInt(JobContext.REDUCE_MERGE_INMEM_THRESHOLD, 4);
+ job.setBoolean(JobContext.JOB_UBERTASK_ENABLE, false);
Counters c = runJob(job);
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
- assertTrue("Expected all records spilled during reduce (" + spill + ")",
- spill >= 2 * out); // all records spill at map, reduce
+ assertTrue("Expected all records spilled during reduce (" + spill + " >= 2*"
+ + out + ")", spill >= 2 * out); // all records spill at map, reduce
assertTrue("Expected intermediate merges (" + spill + ")",
spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice
}
@@ -60,6 +61,7 @@ public class TestReduceFetch extends Tes
job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "1.0");
job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
job.setNumMapTasks(MAP_TASKS);
+ job.setBoolean(JobContext.JOB_UBERTASK_ENABLE, false);
Counters c = runJob(job);
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java Tue Mar 8 05:55:40 2011
@@ -89,11 +89,12 @@ public class TestReduceFetchFromPartialM
job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.14");
job.set(JobContext.SHUFFLE_MERGE_EPRCENT, "1.0");
+ job.setBoolean(JobContext.JOB_UBERTASK_ENABLE, false);
Counters c = runJob(job);
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
- assertTrue("Expected some records not spilled during reduce" + spill + ")",
- spill < 2 * out); // spilled map records, some records at the reduce
+ assertTrue("Expected some records not spilled during reduce (" + spill +
+ " < 2*" + out + ")", spill < 2 * out); // spilled map records, some records at the reduce
}
/**
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=1079206&r1=1079205&r2=1079206&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Tue Mar 8 05:55:40 2011
@@ -138,13 +138,15 @@ public class TestMapReduceLocal extends
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TrackingTextInputFormat.class);
+ conf.setBoolean(JobContext.JOB_UBERTASK_ENABLE, false);
FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
assertTrue(job.waitForCompletion(false));
String out = readFile("out/part-r-00000");
System.out.println(out);
- assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n",
- out);
+ assertEquals(
+ "a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n",
+ out);
Counters ctrs = job.getCounters();
System.out.println("Counters: " + ctrs);
long mapIn = ctrs.findCounter(FileInputFormat.COUNTER_GROUP,
@@ -170,8 +172,8 @@ public class TestMapReduceLocal extends
assertEquals("combine out = reduce in", combineOut, reduceIn);
assertTrue("combine in > combine out", combineIn > combineOut);
assertEquals("reduce groups = reduce out", reduceGrps, reduceOut);
- assertEquals("Mismatch in mergedMapOutputs", mergedMapOutputs, 2);
- assertEquals("Mismatch in shuffledMaps", shuffledMaps, 2);
+ assertEquals("Mismatch in mergedMapOutputs", 2, mergedMapOutputs);
+ assertEquals("Mismatch in shuffledMaps", 2, shuffledMaps);
String group = "Random Group";
CounterGroup ctrGrp = ctrs.getGroup(group);
assertEquals(0, ctrGrp.size());