You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2014/10/15 17:53:52 UTC
git commit: MAPREDUCE-5873. Shuffle bandwidth computation includes
time spent waiting for maps. Contributed by Siqi Li
Repository: hadoop
Updated Branches:
refs/heads/trunk 128ace10c -> b9edad640
MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting for maps. Contributed by Siqi Li
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b9edad64
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b9edad64
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b9edad64
Branch: refs/heads/trunk
Commit: b9edad64034a9c8a121ec2b37792c190ba561e26
Parents: 128ace1
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Oct 15 15:52:53 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Oct 15 15:52:53 2014 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../hadoop/mapreduce/task/reduce/Fetcher.java | 2 +-
.../mapreduce/task/reduce/LocalFetcher.java | 2 +-
.../task/reduce/ShuffleSchedulerImpl.java | 107 +++++++++++--
.../task/reduce/TestShuffleScheduler.java | 157 +++++++++++++++++++
5 files changed, 257 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9edad64/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index dc63500..37e92cb 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -429,6 +429,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6115. TestPipeApplication#testSubmitter fails in trunk (Binglin
Chang via jlowe)
+ MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting
+ for maps (Siqi Li via jlowe)
+
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9edad64/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index a416200..796394f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -544,7 +544,7 @@ class Fetcher<K,V> extends Thread {
retryStartTime = 0;
scheduler.copySucceeded(mapId, host, compressedLength,
- endTime - startTime, mapOutput);
+ startTime, endTime, mapOutput);
// Note successful shuffle
remaining.remove(mapId);
metrics.successFetch();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9edad64/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
index 98256c2..6794c99 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
@@ -162,7 +162,7 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
}
}
- scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0,
+ scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,
mapOutput);
return true; // successful fetch.
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9edad64/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
index e48a73a..985a1e1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
@@ -23,6 +23,7 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -64,7 +65,8 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
private static final long INITIAL_PENALTY = 10000;
private static final float PENALTY_GROWTH_RATE = 1.3f;
private final static int REPORT_FAILURE_LIMIT = 10;
-
+ private static final float BYTES_PER_MILLIS_TO_MBS = 1000f / 1024 / 1024;
+
private final boolean[] finishedMaps;
private final int totalMaps;
@@ -92,6 +94,8 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
private final long startTime;
private long lastProgressTime;
+ private final CopyTimeTracker copyTimeTracker;
+
private volatile int maxMapRuntime = 0;
private final int maxFailedUniqueFetches;
private final int maxFetchFailuresBeforeReporting;
@@ -112,7 +116,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
Counters.Counter failedShuffleCounter) {
totalMaps = job.getNumMapTasks();
abortFailureLimit = Math.max(30, totalMaps / 10);
-
+ copyTimeTracker = new CopyTimeTracker();
remainingMaps = totalMaps;
finishedMaps = new boolean[remainingMaps];
this.reporter = reporter;
@@ -180,7 +184,8 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
public synchronized void copySucceeded(TaskAttemptID mapId,
MapHost host,
long bytes,
- long millis,
+ long startMillis,
+ long endMillis,
MapOutput<K,V> output
) throws IOException {
failureCounts.remove(mapId);
@@ -195,29 +200,48 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
notifyAll();
}
- // update the status
+ // update single copy task status
+ long copyMillis = (endMillis - startMillis);
+ if (copyMillis == 0) copyMillis = 1;
+ float bytesPerMillis = (float) bytes / copyMillis;
+ float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
+ String individualProgress = "copy task(" + mapId + " succeeded"
+ + " at " + mbpsFormat.format(transferRate) + " MB/s)";
+ // update the aggregated status
+ copyTimeTracker.add(startMillis, endMillis);
+
totalBytesShuffledTillNow += bytes;
- updateStatus();
+ updateStatus(individualProgress);
reduceShuffleBytes.increment(bytes);
lastProgressTime = Time.monotonicNow();
LOG.debug("map " + mapId + " done " + status.getStateString());
}
}
- private void updateStatus() {
- float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
+ private synchronized void updateStatus(String individualProgress) {
int mapsDone = totalMaps - remainingMaps;
- long secsSinceStart = (Time.monotonicNow() - startTime) / 1000 + 1;
-
- float transferRate = mbs / secsSinceStart;
+ long totalCopyMillis = copyTimeTracker.getCopyMillis();
+ if (totalCopyMillis == 0) totalCopyMillis = 1;
+ float bytesPerMillis = (float) totalBytesShuffledTillNow / totalCopyMillis;
+ float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
progress.set((float) mapsDone / totalMaps);
String statusString = mapsDone + " / " + totalMaps + " copied.";
status.setStateString(statusString);
- progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
- + mbpsFormat.format(transferRate) + " MB/s)");
+ if (individualProgress != null) {
+ progress.setStatus(individualProgress + " Aggregated copy rate(" +
+ mapsDone + " of " + totalMaps + " at " +
+ mbpsFormat.format(transferRate) + " MB/s)");
+ } else {
+ progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
+ + mbpsFormat.format(transferRate) + " MB/s)");
+ }
}
+ private void updateStatus() {
+ updateStatus(null);
+ }
+
public synchronized void hostFailed(String hostname) {
if (hostFailures.containsKey(hostname)) {
IntWritable x = hostFailures.get(hostname);
@@ -520,4 +544,63 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
public int getMaxHostFailures() {
return maxHostFailures;
}
+
+ private static class CopyTimeTracker {
+ List<Interval> intervals;
+ long copyMillis;
+ public CopyTimeTracker() {
+ intervals = Collections.emptyList();
+ copyMillis = 0;
+ }
+ public void add(long s, long e) {
+ Interval interval = new Interval(s, e);
+ copyMillis = getTotalCopyMillis(interval);
+ }
+
+ public long getCopyMillis() {
+ return copyMillis;
+ }
+ // This method captures the time during which any copy was in progress
+ // each copy time period is record in the Interval list
+ private long getTotalCopyMillis(Interval newInterval) {
+ if (newInterval == null) {
+ return copyMillis;
+ }
+ List<Interval> result = new ArrayList<Interval>(intervals.size() + 1);
+ for (Interval interval: intervals) {
+ if (interval.end < newInterval.start) {
+ result.add(interval);
+ } else if (interval.start > newInterval.end) {
+ result.add(newInterval);
+ newInterval = interval;
+ } else {
+ newInterval = new Interval(
+ Math.min(interval.start, newInterval.start),
+ Math.max(newInterval.end, interval.end));
+ }
+ }
+ result.add(newInterval);
+ intervals = result;
+
+ //compute total millis
+ long length = 0;
+ for (Interval interval : intervals) {
+ length += interval.getIntervalLength();
+ }
+ return length;
+ }
+
+ private static class Interval {
+ final long start;
+ final long end;
+ public Interval(long s, long e) {
+ start = s;
+ end = e;
+ }
+
+ public long getIntervalLength() {
+ return end - start;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9edad64/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
index 355a419..905fd44 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
@@ -17,9 +17,20 @@
*/
package org.apache.hadoop.mapreduce.task.reduce;
+import static org.mockito.Mockito.mock;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
+import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
@@ -66,4 +77,150 @@ public class TestShuffleScheduler {
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public <K, V> void TestAggregatedTransferRate() throws Exception {
+ JobConf job = new JobConf();
+ job.setNumMapTasks(10);
+ //mock creation
+ TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+ Reporter mockReporter = mock(Reporter.class);
+ FileSystem mockFileSystem = mock(FileSystem.class);
+ Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass();
+ @SuppressWarnings("unchecked") // needed for mock with generic
+ CombineOutputCollector<K, V> mockCombineOutputCollector =
+ (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
+ org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
+ mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
+ LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+ CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
+ Counter mockCounter = mock(Counter.class);
+ TaskStatus mockTaskStatus = mock(TaskStatus.class);
+ Progress mockProgress = mock(Progress.class);
+ MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
+ Task mockTask = mock(Task.class);
+ @SuppressWarnings("unchecked")
+ MapOutput<K, V> output = mock(MapOutput.class);
+
+ ShuffleConsumerPlugin.Context<K, V> context =
+ new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, job, mockFileSystem,
+ mockUmbilical, mockLocalDirAllocator,
+ mockReporter, mockCompressionCodec,
+ combinerClass, mockCombineOutputCollector,
+ mockCounter, mockCounter, mockCounter,
+ mockCounter, mockCounter, mockCounter,
+ mockTaskStatus, mockProgress, mockProgress,
+ mockTask, mockMapOutputFile, null);
+ TaskStatus status = new TaskStatus() {
+ @Override
+ public boolean getIsMap() {
+ return false;
+ }
+ @Override
+ public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+ }
+ };
+ Progress progress = new Progress();
+ ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job, status, null,
+ null, progress, context.getShuffledMapsCounter(),
+ context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+ TaskAttemptID attemptID0 = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 0), 0);
+
+ //adding the 1st interval, 40MB from 60s to 100s
+ long bytes = (long)40 * 1024 * 1024;
+ scheduler.copySucceeded(attemptID0, new MapHost(null, null), bytes, 60000, 100000, output);
+ Assert.assertEquals("copy task(attempt_test_0000_m_000000_0 succeeded at 1.00 MB/s)"
+ + " Aggregated copy rate(1 of 10 at 1.00 MB/s)", progress.toString());
+
+ TaskAttemptID attemptID1 = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 1), 1);
+
+ //adding the 2nd interval before the 1st interval, 50MB from 0s to 50s
+ bytes = (long)50 * 1024 * 1024;
+ scheduler.copySucceeded(attemptID1, new MapHost(null, null), bytes, 0, 50000, output);
+ Assert.assertEquals("copy task(attempt_test_0000_m_000001_1 succeeded at 1.00 MB/s)"
+ + " Aggregated copy rate(2 of 10 at 1.00 MB/s)", progress.toString());
+
+ TaskAttemptID attemptID2 = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 2), 2);
+
+ //adding the 3rd interval overlapping with the 1st and the 2nd interval
+ //110MB from 25s to 80s
+ bytes = (long)110 * 1024 * 1024;
+ scheduler.copySucceeded(attemptID2, new MapHost(null, null), bytes, 25000, 80000, output);
+ Assert.assertEquals("copy task(attempt_test_0000_m_000002_2 succeeded at 2.00 MB/s)"
+ + " Aggregated copy rate(3 of 10 at 2.00 MB/s)", progress.toString());
+
+ TaskAttemptID attemptID3 = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 3), 3);
+
+ //adding the 4th interval just after the 2nd interval, 100MB from 100s to 300s
+ bytes = (long)100 * 1024 * 1024;
+ scheduler.copySucceeded(attemptID3, new MapHost(null, null), bytes, 100000, 300000, output);
+ Assert.assertEquals("copy task(attempt_test_0000_m_000003_3 succeeded at 0.50 MB/s)"
+ + " Aggregated copy rate(4 of 10 at 1.00 MB/s)", progress.toString());
+
+ TaskAttemptID attemptID4 = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 4), 4);
+
+ //adding the 5th interval between after 4th, 50MB from 350s to 400s
+ bytes = (long)50 * 1024 * 1024;
+ scheduler.copySucceeded(attemptID4, new MapHost(null, null), bytes, 350000, 400000, output);
+ Assert.assertEquals("copy task(attempt_test_0000_m_000004_4 succeeded at 1.00 MB/s)"
+ + " Aggregated copy rate(5 of 10 at 1.00 MB/s)", progress.toString());
+
+
+ TaskAttemptID attemptID5 = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 5), 5);
+ //adding the 6th interval between after 5th, 50MB from 450s to 500s
+ bytes = (long)50 * 1024 * 1024;
+ scheduler.copySucceeded(attemptID5, new MapHost(null, null), bytes, 450000, 500000, output);
+ Assert.assertEquals("copy task(attempt_test_0000_m_000005_5 succeeded at 1.00 MB/s)"
+ + " Aggregated copy rate(6 of 10 at 1.00 MB/s)", progress.toString());
+
+ TaskAttemptID attemptID6 = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 6), 6);
+ //adding the 7th interval between after 5th and 6th interval, 20MB from 320s to 340s
+ bytes = (long)20 * 1024 * 1024;
+ scheduler.copySucceeded(attemptID6, new MapHost(null, null), bytes, 320000, 340000, output);
+ Assert.assertEquals("copy task(attempt_test_0000_m_000006_6 succeeded at 1.00 MB/s)"
+ + " Aggregated copy rate(7 of 10 at 1.00 MB/s)", progress.toString());
+
+ TaskAttemptID attemptID7 = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 7), 7);
+ //adding the 8th interval overlapping with 4th, 5th, and 7th 30MB from 290s to 350s
+ bytes = (long)30 * 1024 * 1024;
+ scheduler.copySucceeded(attemptID7, new MapHost(null, null), bytes, 290000, 350000, output);
+ Assert.assertEquals("copy task(attempt_test_0000_m_000007_7 succeeded at 0.50 MB/s)"
+ + " Aggregated copy rate(8 of 10 at 1.00 MB/s)", progress.toString());
+
+ TaskAttemptID attemptID8 = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 8), 8);
+ //adding the 9th interval overlapping with 5th and 6th, 50MB from 400s to 450s
+ bytes = (long)50 * 1024 * 1024;
+ scheduler.copySucceeded(attemptID8, new MapHost(null, null), bytes, 400000, 450000, output);
+ Assert.assertEquals("copy task(attempt_test_0000_m_000008_8 succeeded at 1.00 MB/s)"
+ + " Aggregated copy rate(9 of 10 at 1.00 MB/s)", progress.toString());
+
+ TaskAttemptID attemptID9 = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 9), 9);
+ //adding the 10th interval overlapping with all intervals, 500MB from 0s to 500s
+ bytes = (long)500 * 1024 * 1024;
+ scheduler.copySucceeded(attemptID9, new MapHost(null, null), bytes, 0, 500000, output);
+ Assert.assertEquals("copy task(attempt_test_0000_m_000009_9 succeeded at 1.00 MB/s)"
+ + " Aggregated copy rate(10 of 10 at 2.00 MB/s)", progress.toString());
+
+ }
}