You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2012/12/10 04:37:44 UTC
svn commit: r1419193 - in
/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/
hadoop-mapreduce-client/hadoop-mapreduce-cli...
Author: szetszwo
Date: Mon Dec 10 03:37:32 2012
New Revision: 1419193
URL: http://svn.apache.org/viewvc?rev=1419193&view=rev
Log:
Merge r1415804 through r1419190 from trunk.
Added:
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
- copied unchanged from r1419190, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
- copied unchanged from r1419190, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
Modified:
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed)
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1415804-1419190
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Mon Dec 10 03:37:32 2012
@@ -11,6 +11,9 @@ Trunk (Unreleased)
MAPREDUCE-2669. Add new examples for Mean, Median, and Standard Deviation.
(Plamen Jeliazkov via shv)
+ MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
+ (Avner BenHanoch via acmurthy)
+
IMPROVEMENTS
MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
@@ -604,6 +607,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4817. Hardcoded task ping timeout kills tasks localizing large
amounts of data (tgraves)
+ MAPREDUCE-4836. Elapsed time for running tasks on AM web UI tasks page is 0
+ (Ravi Prakash via jeagles)
+
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1415804-1419190
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1415804-1419190
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java Mon Dec 10 03:37:32 2012
@@ -59,11 +59,12 @@ public class TaskInfo {
TaskReport report = task.getReport();
this.startTime = report.getStartTime();
this.finishTime = report.getFinishTime();
- this.elapsedTime = Times.elapsed(this.startTime, this.finishTime, false);
+ this.state = report.getTaskState();
+ this.elapsedTime = Times.elapsed(this.startTime, this.finishTime,
+ this.state == TaskState.RUNNING);
if (this.elapsedTime == -1) {
this.elapsedTime = 0;
}
- this.state = report.getTaskState();
this.progress = report.getProgress() * 100;
this.id = MRApps.toString(task.getID());
this.taskNum = task.getID().getId();
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Mon Dec 10 03:37:32 2012
@@ -340,6 +340,7 @@ public class ReduceTask extends Task {
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
+ ShuffleConsumerPlugin shuffleConsumerPlugin = null;
boolean isLocal = false;
// local if
@@ -358,8 +359,14 @@ public class ReduceTask extends Task {
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
- Shuffle shuffle =
- new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical,
+ Class<? extends ShuffleConsumerPlugin> clazz =
+ job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
+
+ shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
+ LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
+
+ ShuffleConsumerPlugin.Context shuffleContext =
+ new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
@@ -368,7 +375,8 @@ public class ReduceTask extends Task {
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile);
- rIter = shuffle.run();
+ shuffleConsumerPlugin.init(shuffleContext);
+ rIter = shuffleConsumerPlugin.run();
} else {
// local job runner doesn't have a copy phase
copyPhase.complete();
@@ -399,6 +407,10 @@ public class ReduceTask extends Task {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
+
+ if (shuffleConsumerPlugin != null) {
+ shuffleConsumerPlugin.close();
+ }
done(umbilical, reporter);
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Mon Dec 10 03:37:32 2012
@@ -85,6 +85,9 @@ public interface MRConfig {
public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
+ public static final String SHUFFLE_CONSUMER_PLUGIN =
+ "mapreduce.job.reduce.shuffle.consumer.plugin.class";
+
/**
* Configuration key to enable/disable IFile readahead.
*/
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Mon Dec 10 03:37:32 2012
@@ -34,73 +34,63 @@ import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate("mapreduce")
@InterfaceStability.Unstable
@SuppressWarnings({"unchecked", "rawtypes"})
-public class Shuffle<K, V> implements ExceptionReporter {
+public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
private static final int PROGRESS_FREQUENCY = 2000;
private static final int MAX_EVENTS_TO_FETCH = 10000;
private static final int MIN_EVENTS_TO_FETCH = 100;
private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
- private final TaskAttemptID reduceId;
- private final JobConf jobConf;
- private final Reporter reporter;
- private final ShuffleClientMetrics metrics;
- private final TaskUmbilicalProtocol umbilical;
+ private ShuffleConsumerPlugin.Context context;
+
+ private TaskAttemptID reduceId;
+ private JobConf jobConf;
+ private Reporter reporter;
+ private ShuffleClientMetrics metrics;
+ private TaskUmbilicalProtocol umbilical;
- private final ShuffleScheduler<K,V> scheduler;
- private final MergeManager<K, V> merger;
+ private ShuffleScheduler<K,V> scheduler;
+ private MergeManager<K, V> merger;
private Throwable throwable = null;
private String throwingThreadName = null;
- private final Progress copyPhase;
- private final TaskStatus taskStatus;
- private final Task reduceTask; //Used for status updates
-
- public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS,
- TaskUmbilicalProtocol umbilical,
- LocalDirAllocator localDirAllocator,
- Reporter reporter,
- CompressionCodec codec,
- Class<? extends Reducer> combinerClass,
- CombineOutputCollector<K,V> combineCollector,
- Counters.Counter spilledRecordsCounter,
- Counters.Counter reduceCombineInputCounter,
- Counters.Counter shuffledMapsCounter,
- Counters.Counter reduceShuffleBytes,
- Counters.Counter failedShuffleCounter,
- Counters.Counter mergedMapOutputsCounter,
- TaskStatus status,
- Progress copyPhase,
- Progress mergePhase,
- Task reduceTask,
- MapOutputFile mapOutputFile) {
- this.reduceId = reduceId;
- this.jobConf = jobConf;
- this.umbilical = umbilical;
- this.reporter = reporter;
+ private Progress copyPhase;
+ private TaskStatus taskStatus;
+ private Task reduceTask; //Used for status updates
+
+ @Override
+ public void init(ShuffleConsumerPlugin.Context context) {
+ this.context = context;
+
+ this.reduceId = context.getReduceId();
+ this.jobConf = context.getJobConf();
+ this.umbilical = context.getUmbilical();
+ this.reporter = context.getReporter();
this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
- this.copyPhase = copyPhase;
- this.taskStatus = status;
- this.reduceTask = reduceTask;
+ this.copyPhase = context.getCopyPhase();
+ this.taskStatus = context.getStatus();
+ this.reduceTask = context.getReduceTask();
scheduler =
- new ShuffleScheduler<K,V>(jobConf, status, this, copyPhase,
- shuffledMapsCounter,
- reduceShuffleBytes, failedShuffleCounter);
- merger = new MergeManager<K, V>(reduceId, jobConf, localFS,
- localDirAllocator, reporter, codec,
- combinerClass, combineCollector,
- spilledRecordsCounter,
- reduceCombineInputCounter,
- mergedMapOutputsCounter,
- this, mergePhase, mapOutputFile);
+ new ShuffleScheduler<K,V>(jobConf, taskStatus, this, copyPhase,
+ context.getShuffledMapsCounter(),
+ context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+ merger = new MergeManager<K, V>(reduceId, jobConf, context.getLocalFS(),
+ context.getLocalDirAllocator(), reporter, context.getCodec(),
+ context.getCombinerClass(), context.getCombineCollector(),
+ context.getSpilledRecordsCounter(),
+ context.getReduceCombineInputCounter(),
+ context.getMergedMapOutputsCounter(),
+ this, context.getMergePhase(), context.getMapOutputFile());
}
+ @Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
@@ -171,6 +161,10 @@ public class Shuffle<K, V> implements Ex
return kvIter;
}
+ @Override
+ public void close(){
+ }
+
public synchronized void reportException(Throwable t) {
if (throwable == null) {
throwable = t;
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Mon Dec 10 03:37:32 2012
@@ -748,6 +748,16 @@
</description>
</property>
+<property>
+ <name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
+ <value>org.apache.hadoop.mapreduce.task.reduce.Shuffle</value>
+ <description>
+ Name of the class whose instance will be used
+ to send shuffle requests by reducetasks of this job.
+ The class must be an instance of org.apache.hadoop.mapred.ShuffleConsumerPlugin.
+ </description>
+</property>
+
<!-- MR YARN Application properties -->
<property>
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1415804-1419190