You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/05/20 11:13:56 UTC
svn commit: r776631 - in /hadoop/core/trunk: ./
src/core/org/apache/hadoop/util/ src/mapred/org/apache/hadoop/mapred/
src/test/mapred/org/apache/hadoop/mapred/ src/webapps/job/
Author: ddas
Date: Wed May 20 09:13:55 2009
New Revision: 776631
URL: http://svn.apache.org/viewvc?rev=776631&view=rev
Log:
HADOOP-5572. Improves the progress reporting for the sort phase for both maps and reduces. Contributed by Ravi Gummadi.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/util/Progress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java
hadoop/core/trunk/src/webapps/job/taskdetails.jsp
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 20 09:13:55 2009
@@ -376,6 +376,9 @@
HADOOP-5873. Remove deprecated methods randomDataNode() and
getDatanodeByIndex(..) in FSNamesystem. (szetszwo)
+ HADOOP-5572. Improves the progress reporting for the sort phase for both
+ maps and reduces. (Ravi Gummadi via ddas)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/Progress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/Progress.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/Progress.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/Progress.java Wed May 20 09:13:55 2009
@@ -20,19 +20,32 @@
import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
/** Utility to assist with generation of progress reports. Applications build
* a hierarchy of {@link Progress} instances, each modelling a phase of
* execution. The root is constructed with {@link #Progress()}. Nodes for
* sub-phases are created by calling {@link #addPhase()}.
*/
public class Progress {
+ private static final Log LOG = LogFactory.getLog(Progress.class);
private String status = "";
private float progress;
private int currentPhase;
private ArrayList<Progress> phases = new ArrayList<Progress>();
private Progress parent;
- private float progressPerPhase;
+ // Each phase can have different progress weightage. For example, in
+ // Map Task, map phase accounts for 66.7% and sort phase for 33.3%.
+ // User needs to give weightages as parameters to all phases(when adding
+ // phases) in a Progress object, if he wants to give weightage to any of the
+ // phases. So when nodes are added without specifying weightage, it means
+ // fixed weightage for all phases.
+ private boolean fixedWeightageForAllPhases = false;
+ private float progressPerPhase = 0.0f;
+ private ArrayList<Float> progressWeightagesForPhases = new ArrayList<Float>();
+
/** Creates a new root node. */
public Progress() {}
@@ -43,15 +56,73 @@
return phase;
}
- /** Adds a node to the tree. */
+ /** Adds a node to the tree. Gives equal weightage to all phases */
public synchronized Progress addPhase() {
+ Progress phase = addNewPhase();
+ // set equal weightage for all phases
+ progressPerPhase = 1.0f / (float)phases.size();
+ fixedWeightageForAllPhases = true;
+ return phase;
+ }
+
+ /** Adds a new phase. Caller needs to set progress weightage */
+ private synchronized Progress addNewPhase() {
Progress phase = new Progress();
phases.add(phase);
phase.setParent(this);
- progressPerPhase = 1.0f / (float)phases.size();
return phase;
}
+ /** Adds a named node with a specified progress weightage to the tree. */
+ public Progress addPhase(String status, float weightage) {
+ Progress phase = addPhase(weightage);
+ phase.setStatus(status);
+
+ return phase;
+ }
+
+ /** Adds a node with a specified progress weightage to the tree. */
+ public synchronized Progress addPhase(float weightage) {
+ Progress phase = new Progress();
+ progressWeightagesForPhases.add(weightage);
+ phases.add(phase);
+ phase.setParent(this);
+
+ // Ensure that the sum of weightages does not cross 1.0
+ float sum = 0;
+ for (int i = 0; i < phases.size(); i++) {
+ sum += progressWeightagesForPhases.get(i);
+ }
+ if (sum > 1.0) {
+ LOG.warn("Sum of weightages can not be more than 1.0; But sum = " + sum);
+ }
+
+ return phase;
+ }
+
+ /** Adds n nodes to the tree. Gives equal weightage to all phases */
+ public synchronized void addPhases(int n) {
+ for (int i = 0; i < n; i++) {
+ addNewPhase();
+ }
+ // set equal weightage for all phases
+ progressPerPhase = 1.0f / (float)phases.size();
+ fixedWeightageForAllPhases = true;
+ }
+
+ /**
+ * returns progress weightage of the given phase
+ * @param phaseNum the phase number of the phase(child node) for which we need
+ * progress weightage
+ * @return returns the progress weightage of the specified phase
+ */
+ float getProgressWeightage(int phaseNum) {
+ if (fixedWeightageForAllPhases) {
+ return progressPerPhase; // all phases are of equal weightage
+ }
+ return progressWeightagesForPhases.get(phaseNum);
+ }
+
synchronized Progress getParent() { return parent; }
synchronized void setParent(Progress parent) { this.parent = parent; }
@@ -89,8 +160,8 @@
}
/** Returns the overall progress of the root. */
- // this method probably does not need to be synchronized as getINternal() is synchronized
- // and the node's parent never changes. Still, it doesn't hurt.
+ // this method probably does not need to be synchronized as getInternal() is
+ // synchronized and the node's parent never changes. Still, it doesn't hurt.
public synchronized float get() {
Progress node = this;
while (node.getParent() != null) { // find the root
@@ -99,13 +170,37 @@
return node.getInternal();
}
+ /**
+ * Returns progress in this node. get() would give overall progress of the
+ * root node(not just given current node).
+ */
+ public synchronized float getProgress() {
+ return getInternal();
+ }
+
/** Computes progress in this node. */
private synchronized float getInternal() {
int phaseCount = phases.size();
if (phaseCount != 0) {
- float subProgress =
- currentPhase < phaseCount ? phase().getInternal() : 0.0f;
- return progressPerPhase*(currentPhase + subProgress);
+ float subProgress = 0.0f;
+ float progressFromCurrentPhase = 0.0f;
+ if (currentPhase < phaseCount) {
+ subProgress = phase().getInternal();
+ progressFromCurrentPhase =
+ getProgressWeightage(currentPhase) * subProgress;
+ }
+
+ float progressFromCompletedPhases = 0.0f;
+ if (fixedWeightageForAllPhases) { // same progress weightage for each phase
+ progressFromCompletedPhases = progressPerPhase * currentPhase;
+ }
+ else {
+ for (int i = 0; i < currentPhase; i++) {
+ // progress weightages of phases could be different. Add them
+ progressFromCompletedPhases += getProgressWeightage(i);
+ }
+ }
+ return progressFromCompletedPhases + progressFromCurrentPhase;
} else {
return progress;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Wed May 20 09:13:55 2009
@@ -20,7 +20,6 @@
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -121,8 +120,8 @@
LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
- SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT,
- TRACKER_NAME, STATE_STRING, VERSION
+ SHUFFLE_FINISHED, SORT_FINISHED, MAP_FINISHED, COUNTERS, SPLITS,
+ JOB_PRIORITY, HTTP_PORT, TRACKER_NAME, STATE_STRING, VERSION
}
/**
@@ -1394,29 +1393,56 @@
/**
* Log finish time of map task attempt.
* @param taskAttemptId task attempt id
- * @param finishTime finish time
+ * @param finishTime finish time of map task
* @param hostName host name
* @deprecated Use
- * {@link #logFinished(TaskAttemptID, long, String, String, String, Counters)}
+ * {@link #logFinished(TaskAttemptID, long, long, String, String, String,
+ * Counters)}
*/
@Deprecated
public static void logFinished(TaskAttemptID taskAttemptId, long finishTime,
String hostName){
- logFinished(taskAttemptId, finishTime, hostName, Values.MAP.name(), "",
- new Counters());
+ logFinished(taskAttemptId, finishTime, finishTime, hostName,
+ Values.MAP.name(), "", new Counters());
}
/**
* Log finish time of map task attempt.
*
* @param taskAttemptId task attempt id
- * @param finishTime finish time
+ * @param finishTime finish time of map task
+ * @param hostName host name
+ * @param taskType Whether the attempt is cleanup or setup or map
+ * @param stateString state string of the task attempt
+ * @param counter counters of the task attempt
+ * @deprecated Use
+ * {@link #logFinished(TaskAttemptID, long, long, String, String, String,
+ * Counters)}
+ */
+ @Deprecated
+ public static void logFinished(TaskAttemptID taskAttemptId,
+ long finishTime,
+ String hostName,
+ String taskType,
+ String stateString,
+ Counters counter) {
+ logFinished(taskAttemptId, finishTime, finishTime, hostName,
+ taskType, stateString, counter);
+ }
+
+ /**
+ * Log finish time of map task attempt.
+ *
+ * @param taskAttemptId task attempt id
+ * @param mapFinishTime finish time of map phase in map task
+ * @param finishTime finish time of map task
* @param hostName host name
* @param taskType Whether the attempt is cleanup or setup or map
* @param stateString state string of the task attempt
* @param counter counters of the task attempt
*/
public static void logFinished(TaskAttemptID taskAttemptId,
+ long mapFinishTime,
long finishTime,
String hostName,
String taskType,
@@ -1430,12 +1456,14 @@
JobHistory.log(writer, RecordTypes.MapAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
+ Keys.MAP_FINISHED,
Keys.FINISH_TIME, Keys.HOSTNAME,
Keys.STATE_STRING, Keys.COUNTERS},
new String[]{taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
Values.SUCCESS.name(),
+ String.valueOf(mapFinishTime),
String.valueOf(finishTime), hostName,
stateString,
counter.makeEscapedCompactString()});
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Wed May 20 09:13:55 2009
@@ -2011,7 +2011,9 @@
status.getTaskTracker(),
ttStatus.getHttpPort(),
taskType);
- JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(),
+ JobHistory.MapAttempt.logFinished(status.getTaskID(),
+ status.getMapFinishTime(),
+ status.getFinishTime(),
trackerHostname, taskType,
status.getStateString(),
status.getCounters());
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed May 20 09:13:55 2009
@@ -1086,6 +1086,10 @@
taskStatus.setShuffleFinishTime(shuffleTime);
taskStatus.setSortFinishTime(sortTime);
}
+ else if (type.equals(Values.MAP.name())) {
+ taskStatus.setMapFinishTime(
+ Long.parseLong(attempt.get(Keys.MAP_FINISHED)));
+ }
// Add the counters
String counterString = attempt.get(Keys.COUNTERS);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Wed May 20 09:13:55 2009
@@ -73,8 +73,13 @@
private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
+ private Progress mapPhase;
+ private Progress sortPhase;
+
+
{ // set phase for this task
setPhase(TaskStatus.Phase.MAP);
+ getProgress().setStatus("map");
}
public MapTask() {
@@ -273,6 +278,11 @@
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
+ if (isMapTask()) {
+ mapPhase = getProgress().addPhase("map", 0.667f);
+ sortPhase = getProgress().addPhase("sort", 0.333f);
+ }
+
// start thread that will handle communication with parent
TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
reporter.startCommunicationThread();
@@ -348,6 +358,9 @@
try {
runner.run(in, collector, reporter);
+ mapPhase.complete();
+ setPhase(TaskStatus.Phase.SORT);
+ statusUpdate(umbilical);
collector.flush();
} finally {
//close
@@ -510,6 +523,9 @@
input.initialize(split, mapperContext);
mapper.run(mapperContext);
+ mapPhase.complete();
+ setPhase(TaskStatus.Phase.SORT);
+ statusUpdate(umbilical);
input.close();
output.close(mapperContext);
} catch (NoSuchMethodException e) {
@@ -1385,6 +1401,9 @@
return;
}
{
+ sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
+ Merger.considerFinalMergeForProgress();
+
IndexRecord rec = new IndexRecord();
final SpillRecord spillRec = new SpillRecord(partitions);
for (int parts = 0; parts < partitions; parts++) {
@@ -1406,14 +1425,17 @@
}
}
+ int mergeFactor = job.getInt("io.sort.factor", 100);
+ // sort the segments only if there are intermediate merges
+ boolean sortSegments = segmentList.size() > mergeFactor;
//merge
@SuppressWarnings("unchecked")
RawKeyValueIterator kvIter = Merger.merge(job, rfs,
keyClass, valClass,
- segmentList, job.getInt("io.sort.factor", 100),
+ segmentList, mergeFactor,
new Path(mapId.toString()),
- job.getOutputKeyComparator(), reporter,
- null, spilledRecordsCounter);
+ job.getOutputKeyComparator(), reporter, sortSegments,
+ null, spilledRecordsCounter, sortPhase.phase());
//write merged output to disk
long segmentStart = finalOut.getPos();
@@ -1430,6 +1452,8 @@
//close
writer.close();
+ sortPhase.startNextPhase();
+
// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java Wed May 20 09:13:55 2009
@@ -18,9 +18,16 @@
package org.apache.hadoop.mapred;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
class MapTaskStatus extends TaskStatus {
+ private long mapFinishTime;
+ private long sortFinishTime;
+
public MapTaskStatus() {}
public MapTaskStatus(TaskAttemptID taskid, float progress,
@@ -35,6 +42,19 @@
return true;
}
+ /**
+ * Sets finishTime.
+ * @param finishTime finish time of task.
+ */
+ @Override
+ void setFinishTime(long finishTime) {
+ super.setFinishTime(finishTime);
+ if (mapFinishTime == 0) {
+ mapFinishTime = finishTime;
+ }
+ setSortFinishTime(finishTime);
+ }
+
@Override
public long getShuffleFinishTime() {
throw new UnsupportedOperationException("getShuffleFinishTime() not supported for MapTask");
@@ -46,12 +66,43 @@
}
@Override
+ public long getMapFinishTime() {
+ return mapFinishTime;
+ }
+
+ @Override
+ void setMapFinishTime(long mapFinishTime) {
+ this.mapFinishTime = mapFinishTime;
+ }
+
+ @Override
public long getSortFinishTime() {
- throw new UnsupportedOperationException("getSortFinishTime() not supported for MapTask");
+ return sortFinishTime;
}
@Override
void setSortFinishTime(long sortFinishTime) {
- throw new UnsupportedOperationException("setSortFinishTime() not supported for MapTask");
+ this.sortFinishTime = sortFinishTime;
+ }
+
+ @Override
+ synchronized void statusUpdate(TaskStatus status) {
+ super.statusUpdate(status);
+
+ if (status.getMapFinishTime() != 0) {
+ this.mapFinishTime = status.getMapFinishTime();
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ mapFinishTime = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeLong(mapFinishTime);
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Wed May 20 09:13:55 2009
@@ -55,13 +55,14 @@
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
Counters.Counter readsCounter,
- Counters.Counter writesCounter)
+ Counters.Counter writesCounter,
+ Progress mergePhase)
throws IOException {
return
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
reporter).merge(keyClass, valueClass,
mergeFactor, tmpDir,
- readsCounter, writesCounter);
+ readsCounter, writesCounter, mergePhase);
}
public static <K extends Object, V extends Object>
@@ -71,10 +72,11 @@
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
Counters.Counter readsCounter,
- Counters.Counter writesCounter)
+ Counters.Counter writesCounter,
+ Progress mergePhase)
throws IOException {
return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
- comparator, reporter, false, readsCounter, writesCounter);
+ comparator, reporter, false, readsCounter, writesCounter, mergePhase);
}
public static <K extends Object, V extends Object>
@@ -85,12 +87,14 @@
RawComparator<K> comparator, Progressable reporter,
boolean sortSegments,
Counters.Counter readsCounter,
- Counters.Counter writesCounter)
+ Counters.Counter writesCounter,
+ Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments).merge(keyClass, valueClass,
mergeFactor, tmpDir,
- readsCounter, writesCounter);
+ readsCounter, writesCounter,
+ mergePhase);
}
static <K extends Object, V extends Object>
@@ -101,13 +105,15 @@
RawComparator<K> comparator, Progressable reporter,
boolean sortSegments,
Counters.Counter readsCounter,
- Counters.Counter writesCounter)
+ Counters.Counter writesCounter,
+ Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
- readsCounter, writesCounter);
+ readsCounter, writesCounter,
+ mergePhase);
}
public static <K extends Object, V extends Object>
@@ -235,6 +241,20 @@
}
}
+ // Boolean variable for including/considering final merge as part of sort
+ // phase or not. This is true in map task, false in reduce task. It is
+ // used in calculating mergeProgress.
+ static boolean includeFinalMerge = false;
+
+ /**
+ * Sets the boolean variable includeFinalMerge to true. Called from
+ * map task before calling merge() so that final merge of map task
+ * is also considered as part of sort phase.
+ */
+ static void considerFinalMergeForProgress() {
+ includeFinalMerge = true;
+ }
+
private static class MergeQueue<K extends Object, V extends Object>
extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
Configuration conf;
@@ -386,24 +406,41 @@
public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
int factor, Path tmpDir,
Counters.Counter readsCounter,
- Counters.Counter writesCounter)
+ Counters.Counter writesCounter,
+ Progress mergePhase)
throws IOException {
return merge(keyClass, valueClass, factor, 0, tmpDir,
- readsCounter, writesCounter);
+ readsCounter, writesCounter, mergePhase);
}
RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
int factor, int inMem, Path tmpDir,
Counters.Counter readsCounter,
- Counters.Counter writesCounter)
+ Counters.Counter writesCounter,
+ Progress mergePhase)
throws IOException {
LOG.info("Merging " + segments.size() + " sorted segments");
-
- //create the MergeStreams from the sorted map created in the constructor
- //and dump the final output to a file
+
+ /*
+ * If there are inMemory segments, then they come first in the segments
+ * list and then the sorted disk segments. Otherwise(if there are only
+ * disk segments), then they are sorted segments if there are more than
+ * factor segments in the segments list.
+ */
int numSegments = segments.size();
int origFactor = factor;
int passNo = 1;
+ if (mergePhase != null) {
+ mergeProgress = mergePhase;
+ }
+
+ long totalBytes = computeBytesInMerges(factor, inMem);
+ if (totalBytes != 0) {
+ progPerByte = 1.0f / (float)totalBytes;
+ }
+
+ //create the MergeStreams from the sorted map created in the constructor
+ //and dump the final output to a file
do {
//get the factor for this pass of merge. We assume in-memory segments
//are the first entries in the segment list and that the pass factor
@@ -460,34 +497,39 @@
//if we have lesser number of segments remaining, then just return the
//iterator, else do another single level merge
if (numSegments <= factor) {
- // Reset totalBytesProcessed to track the progress of the final merge.
- // This is considered the progress of the reducePhase, the 3rd phase
- // of reduce task. Currently totalBytesProcessed is not used in sort
- // phase of reduce task(i.e. when intermediate merges happen).
- totalBytesProcessed = startBytes;
-
- //calculate the length of the remaining segments. Required for
- //calculating the merge progress
- long totalBytes = 0;
- for (int i = 0; i < segmentsToMerge.size(); i++) {
- totalBytes += segmentsToMerge.get(i).getLength();
+ if (!includeFinalMerge) { // for reduce task
+
+ // Reset totalBytesProcessed and recalculate totalBytes from the
+ // remaining segments to track the progress of the final merge.
+ // Final merge is considered as the progress of the reducePhase,
+ // the 3rd phase of reduce task.
+ totalBytesProcessed = 0;
+ totalBytes = 0;
+ for (int i = 0; i < segmentsToMerge.size(); i++) {
+ totalBytes += segmentsToMerge.get(i).getLength();
+ }
}
if (totalBytes != 0) //being paranoid
progPerByte = 1.0f / (float)totalBytes;
+ totalBytesProcessed += startBytes;
if (totalBytes != 0)
mergeProgress.set(totalBytesProcessed * progPerByte);
else
mergeProgress.set(1.0f); // Last pass and no segments left - we're done
LOG.info("Down to the last merge-pass, with " + numSegments +
- " segments left of total size: " + totalBytes + " bytes");
+ " segments left of total size: " +
+ (totalBytes - totalBytesProcessed) + " bytes");
return this;
} else {
LOG.info("Merging " + segmentsToMerge.size() +
" intermediate segments out of a total of " +
(segments.size()+segmentsToMerge.size()));
+ long bytesProcessedInPrevMerges = totalBytesProcessed;
+ totalBytesProcessed += startBytes;
+
//we want to spread the creation of temp files on multiple disks if
//available under the space constraints
long approxOutputSize = 0;
@@ -516,9 +558,27 @@
// Add the newly create segment to the list of segments to be merged
Segment<K, V> tempSegment =
new Segment<K, V>(conf, fs, outputFile, codec, false);
- segments.add(tempSegment);
+
+ // Insert new merged segment into the sorted list
+ int pos = Collections.binarySearch(segments, tempSegment,
+ segmentComparator);
+ if (pos < 0) {
+ // binary search failed. So position to be inserted at is -pos-1
+ pos = -pos-1;
+ }
+ segments.add(pos, tempSegment);
numSegments = segments.size();
- Collections.sort(segments, segmentComparator);
+
+ // Subtract the difference between expected size of new segment and
+ // actual size of new segment(Expected size of new segment is
+ // inputBytesOfThisMerge) from totalBytes. Expected size and actual
+ // size will match(almost) if combiner is not called in merge.
+ long inputBytesOfThisMerge = totalBytesProcessed -
+ bytesProcessedInPrevMerges;
+ totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
+ if (totalBytes != 0) {
+ progPerByte = 1.0f / (float)totalBytes;
+ }
passNo++;
}
@@ -560,6 +620,57 @@
}
return subList;
}
+
+ /**
+ * Compute expected size of input bytes to merges, will be used in
+ * calculating mergeProgress. This simulates the above merge() method and
+ * tries to obtain the number of bytes that are going to be merged in all
+ * merges(assuming that there is no combiner called while merging).
+ * @param factor io.sort.factor
+ * @param inMem number of segments in memory to be merged
+ */
+ long computeBytesInMerges(int factor, int inMem) {
+ int numSegments = segments.size();
+ List<Long> segmentSizes = new ArrayList<Long>(numSegments);
+ long totalBytes = 0;
+ int n = numSegments - inMem;
+ // factor for 1st pass
+ int f = getPassFactor(factor, 1, n) + inMem;
+ n = numSegments;
+
+ for (int i = 0; i < numSegments; i++) {
+ // Not handling empty segments here assuming that it would not affect
+ // much in calculation of mergeProgress.
+ segmentSizes.add(segments.get(i).getLength());
+ }
+
+ if (includeFinalMerge) {
+ // just increment so that the following while loop iterates
+ // for 1 more iteration. This is to include final merge as part of
+ // the computation of expected input bytes of merges
+ n++;
+ }
+ while (n > f) {
+ long mergedSize = 0;
+ f = Math.min(f, segmentSizes.size());
+ for (int j = 0; j < f; j++) {
+ mergedSize += segmentSizes.remove(0);
+ }
+ totalBytes += mergedSize;
+
+ // insert new size into the sorted list
+ int pos = Collections.binarySearch(segmentSizes, mergedSize);
+ if (pos < 0) {
+ pos = -pos-1;
+ }
+ segmentSizes.add(pos, mergedSize);
+
+ n -= (f-1);
+ f = factor;
+ }
+
+ return totalBytes;
+ }
public Progress getProgress() {
return mergeProgress;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed May 20 09:13:55 2009
@@ -241,7 +241,7 @@
}
public void informReduceProgress() {
- reducePhase.set(super.in.getProgress().get()); // update progress
+ reducePhase.set(super.in.getProgress().getProgress()); // update progress
reporter.progress();
}
}
@@ -392,7 +392,7 @@
job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
- reporter, spilledRecordsCounter, null)
+ reporter, spilledRecordsCounter, null, sortPhase)
: reduceCopier.createKVIterator(job, rfs, reporter);
// free up the data structures
@@ -1786,9 +1786,7 @@
InMemFSMergeThread inMemFSMergeThread = null;
GetMapEventsThread getMapEventsThread = null;
- for (int i = 0; i < numMaps; i++) {
- copyPhase.addPhase(); // add sub-phase per file
- }
+ copyPhase.addPhases(numMaps); // add sub-phase per file
copiers = new ArrayList<MapOutputCopier>(numCopiers);
@@ -2243,6 +2241,10 @@
// segments required to vacate memory
List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
long inMemToDiskBytes = 0;
+ // sortPhaseFinished will be set to true if we call merge() separately
+ // here to vacate memory(i.e. there will not be any intermediate merges.
+ // In other words, only final merge is pending).
+ boolean sortPhaseFinished = false;
if (mapOutputsFilesInMemory.size() > 0) {
TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
inMemToDiskBytes = createInMemorySegments(memDiskSegments,
@@ -2250,12 +2252,20 @@
final int numMemDiskSegments = memDiskSegments.size();
if (numMemDiskSegments > 0 &&
ioSortFactor > mapOutputFilesOnDisk.size()) {
+ // As we have < ioSortFactor files on disk now, after this
+ // merging of inMemory segments, we would have at most ioSortFactor
+ // files on disk. So only final merge(directly feeding to reducers)
+ // will be pending. i.e. reduce phase will be pending.
+ sortPhaseFinished = true;
+
// must spill to disk, but can't retain in-mem for intermediate merge
final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
reduceTask.getTaskID(), inMemToDiskBytes);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
- tmpDir, comparator, reporter, spilledRecordsCounter, null);
+ tmpDir, comparator, reporter, spilledRecordsCounter, null,
+ sortPhase);
+
final Writer writer = new Writer(job, fs, outputPath,
keyClass, valueClass, codec, null);
try {
@@ -2311,10 +2321,12 @@
final int numInMemSegments = memDiskSegments.size();
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
+ Progress mergePhase = (sortPhaseFinished) ? null : sortPhase;
RawKeyValueIterator diskMerge = Merger.merge(
job, fs, keyClass, valueClass, diskSegments,
ioSortFactor, 0 == numInMemSegments ? 0 : numInMemSegments - 1,
- tmpDir, comparator, reporter, false, spilledRecordsCounter, null);
+ tmpDir, comparator, reporter, false, spilledRecordsCounter, null,
+ mergePhase);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
@@ -2324,7 +2336,7 @@
}
return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
- comparator, reporter, spilledRecordsCounter, null);
+ comparator, reporter, spilledRecordsCounter, null, null);
}
class RawKVIteratorReader extends IFile.Reader<K,V> {
@@ -2465,7 +2477,7 @@
codec, mapFiles.toArray(new Path[mapFiles.size()]),
true, ioSortFactor, tmpDir,
conf.getOutputKeyComparator(), reporter,
- spilledRecordsCounter, null);
+ spilledRecordsCounter, null, null);
Merger.writeFile(iter, writer, reporter, conf);
writer.close();
@@ -2562,7 +2574,7 @@
inMemorySegments, inMemorySegments.size(),
new Path(reduceTask.getTaskID().toString()),
conf.getOutputKeyComparator(), reporter,
- spilledRecordsCounter, null);
+ spilledRecordsCounter, null, null);
if (combinerRunner == null) {
Merger.writeFile(rIter, writer, reporter, conf);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java Wed May 20 09:13:55 2009
@@ -88,6 +88,18 @@
}
@Override
+ public long getMapFinishTime() {
+ throw new UnsupportedOperationException(
+ "getMapFinishTime() not supported for ReduceTask");
+ }
+
+ @Override
+ void setMapFinishTime(long shuffleFinishTime) {
+ throw new UnsupportedOperationException(
+ "setMapFinishTime() not supported for ReduceTask");
+ }
+
+ @Override
public List<TaskAttemptID> getFetchFailedMaps() {
return failedFetchTasks;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Wed May 20 09:13:55 2009
@@ -148,6 +148,24 @@
void setShuffleFinishTime(long shuffleFinishTime) {}
/**
+ * Get map phase finish time for the task. If map finsh time was
+ * not set due to sort phase ending within same heartbeat interval,
+ * it is set to finish time of next phase i.e. sort phase
+ * when it is set.
+ * @return 0 if mapFinishTime, sortFinishTime are not set. else
+ * it returns approximate map finish time.
+ */
+ public long getMapFinishTime() {
+ return 0;
+ }
+
+ /**
+ * Set map phase finish time.
+ * @param mapFinishTime
+ */
+ void setMapFinishTime(long mapFinishTime) {}
+
+ /**
* Get sort finish time for the task,. If sort finish time was not set
* due to sort and reduce phase finishing in same heartebat interval, it is
* set to finish time, when finish time is set.
@@ -197,12 +215,17 @@
if (oldPhase != phase){
// sort phase started
if (phase == TaskStatus.Phase.SORT){
- setShuffleFinishTime(System.currentTimeMillis());
+ if (oldPhase == TaskStatus.Phase.MAP) {
+ setMapFinishTime(System.currentTimeMillis());
+ }
+ else {
+ setShuffleFinishTime(System.currentTimeMillis());
+ }
}else if (phase == TaskStatus.Phase.REDUCE){
setSortFinishTime(System.currentTimeMillis());
}
+ this.phase = phase;
}
- this.phase = phase;
}
boolean inTaskCleanupPhase() {
Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Wed May 20 09:13:55 2009
@@ -373,8 +373,8 @@
(status.equals("SUCCESS") || status.equals("FAILED") ||
status.equals("KILLED")));
- // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and
- // SORT_FINISHED time
+ // Successful Reduce Task Attempts should have valid SHUFFLE_FINISHED
+ // time and SORT_FINISHED time
if (type.equals("REDUCE") && status.equals("SUCCESS")) {
time1 = attempt.get(Keys.SHUFFLE_FINISHED);
assertTrue("SHUFFLE_FINISHED time of task attempt " + id +
@@ -389,6 +389,15 @@
assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" +
" in history file", areTimesInOrder(time1, time));
}
+ else if (type.equals("MAP") && status.equals("SUCCESS")) {
+ // Successful MAP Task Attempts should have valid MAP_FINISHED time
+ time1 = attempt.get(Keys.MAP_FINISHED);
+ assertTrue("MAP_FINISHED time of task attempt " + id +
+ " is in unexpected format:" + time1 +
+ " in history file", isTimeValid(time1));
+ assertTrue("MAP_FINISHED time of map task is < START_TIME " +
+ "in history file", areTimesInOrder(time, time1));
+ }
// check if hostname is valid
String hostname = attempt.get(Keys.HOSTNAME);
Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java Wed May 20 09:13:55 2009
@@ -91,7 +91,7 @@
RawKeyValueIterator rawItr =
Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path},
false, conf.getInt("io.sort.factor", 100), tmpDir,
- new Text.Comparator(), new NullProgress(),null,null);
+ new Text.Comparator(), new NullProgress(), null, null, null);
@SuppressWarnings("unchecked") // WritableComparators are not generic
ReduceTask.ValuesIterator valItr =
new ReduceTask.ValuesIterator<Text,Text>(rawItr,
Modified: hadoop/core/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/taskdetails.jsp?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/taskdetails.jsp Wed May 20 09:13:55 2009
@@ -126,7 +126,12 @@
<table border=2 cellpadding="5" cellspacing="2">
<tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Start Time</td>
<%
- if (!ts[0].getIsMap() && !isCleanupOrSetup) {
+ if (ts[0].getIsMap()) {
+ %>
+<td>Map Phase Finished</td>
+ <%
+ }
+ else if(!isCleanupOrSetup) {
%>
<td>Shuffle Finished</td><td>Sort Finished</td>
<%
@@ -181,7 +186,12 @@
out.print("<td>"
+ StringUtils.getFormattedTimeWithDiff(dateFormat, status
.getStartTime(), 0) + "</td>");
- if (!ts[i].getIsMap() && !isCleanupOrSetup) {
+ if (ts[i].getIsMap()) {
+ out.print("<td>"
+ + StringUtils.getFormattedTimeWithDiff(dateFormat, status
+ .getMapFinishTime(), status.getStartTime()) + "</td>");
+ }
+ else if (!isCleanupOrSetup) {
out.print("<td>"
+ StringUtils.getFormattedTimeWithDiff(dateFormat, status
.getShuffleFinishTime(), status.getStartTime()) + "</td>");