You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/14 22:12:08 UTC
[07/37] hadoop git commit: MAPREDUCE-6361. NPE issue in shuffle
caused by concurrent issue between copySucceeded() in one thread and
copyFailed() in another thread on the same host. Contributed by Junping Du.
MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between copySucceeded() in one thread and copyFailed() in another thread on the same host. Contributed by Junping Du.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f4e2b3cc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f4e2b3cc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f4e2b3cc
Branch: refs/heads/HDFS-7240
Commit: f4e2b3cc0b1f4e49c306bc09a9dddd0495225bb2
Parents: 6d5da94
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Wed May 13 00:28:17 2015 +0900
Committer: Tsuyoshi Ozawa <oz...@apache.org>
Committed: Wed May 13 00:28:17 2015 +0900
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 4 ++
.../task/reduce/ShuffleSchedulerImpl.java | 14 +++-
.../task/reduce/TestShuffleScheduler.java | 70 ++++++++++++++++++++
3 files changed, 85 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4e2b3cc/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index ca92a97..15cdf90 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -414,6 +414,10 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6360. TestMapreduceConfigFields is placed in wrong dir,
introducing compile error (Arshad Mohammad via vinayakumarb)
+ MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between
+ copySucceeded() in one thread and copyFailed() in another thread on the
+ same host. (Junping Du via ozawa)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4e2b3cc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
index 8317672..ff0bb4f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
@@ -239,7 +239,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
}
private void updateStatus() {
- updateStatus(null);
+ updateStatus(null);
}
public synchronized void hostFailed(String hostname) {
@@ -263,9 +263,17 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
failureCounts.put(mapId, new IntWritable(1));
}
String hostname = host.getHostName();
+ IntWritable hostFailedNum = hostFailures.get(hostname);
+ // MAPREDUCE-6361: hostname could get cleanup from hostFailures in another
+ // thread with copySucceeded.
+ // In this case, add back hostname to hostFailures to get rid of NPE issue.
+ if (hostFailedNum == null) {
+ hostFailures.put(hostname, new IntWritable(1));
+ }
//report failure if already retried maxHostFailures times
- boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false;
-
+ boolean hostFail = hostFailures.get(hostname).get() >
+ getMaxHostFailures() ? true : false;
+
if (failures >= abortFailureLimit) {
try {
throw new IOException(failures + " failures downloading " + mapId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4e2b3cc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
index 6ac2320..654b748 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
@@ -213,6 +213,76 @@ public class TestShuffleScheduler {
Assert.assertEquals(copyMessage(10, 1, 2), progress.toString());
}
+ @SuppressWarnings("rawtypes")
+ @Test
+ public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
+ JobConf job = new JobConf();
+ job.setNumMapTasks(2);
+ //mock creation
+ TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+ Reporter mockReporter = mock(Reporter.class);
+ FileSystem mockFileSystem = mock(FileSystem.class);
+ Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass();
+ @SuppressWarnings("unchecked") // needed for mock with generic
+ CombineOutputCollector<K, V> mockCombineOutputCollector =
+ (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
+ org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
+ mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
+ LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+ CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
+ Counter mockCounter = mock(Counter.class);
+ TaskStatus mockTaskStatus = mock(TaskStatus.class);
+ Progress mockProgress = mock(Progress.class);
+ MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
+ Task mockTask = mock(Task.class);
+ @SuppressWarnings("unchecked")
+ MapOutput<K, V> output = mock(MapOutput.class);
+
+ ShuffleConsumerPlugin.Context<K, V> context =
+ new ShuffleConsumerPlugin.Context<K, V>(
+ mockTaskAttemptID, job, mockFileSystem,
+ mockUmbilical, mockLocalDirAllocator,
+ mockReporter, mockCompressionCodec,
+ combinerClass, mockCombineOutputCollector,
+ mockCounter, mockCounter, mockCounter,
+ mockCounter, mockCounter, mockCounter,
+ mockTaskStatus, mockProgress, mockProgress,
+ mockTask, mockMapOutputFile, null);
+ TaskStatus status = new TaskStatus() {
+ @Override
+ public boolean getIsMap() {
+ return false;
+ }
+ @Override
+ public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+ }
+ };
+ Progress progress = new Progress();
+ ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
+ status, null, null, progress, context.getShuffledMapsCounter(),
+ context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+
+ MapHost host1 = new MapHost("host1", null);
+ TaskAttemptID failedAttemptID = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 0), 0);
+
+ TaskAttemptID succeedAttemptID = new TaskAttemptID(
+ new org.apache.hadoop.mapred.TaskID(
+ new JobID("test",0), TaskType.MAP, 1), 1);
+
+ // handle output fetch failure for failedAttemptID, part I
+ scheduler.hostFailed(host1.getHostName());
+
+ // handle output fetch succeed for succeedAttemptID
+ long bytes = (long)500 * 1024 * 1024;
+ scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);
+
+ // handle output fetch failure for failedAttemptID, part II
+ // for MAPREDUCE-6361: verify no NPE exception get thrown out
+ scheduler.copyFailed(failedAttemptID, host1, true, false);
+ }
+
private static String copyMessage(int attemptNo, double rate1, double rate2) {
int attemptZero = attemptNo - 1;
return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"