You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/14 22:12:08 UTC

[07/37] hadoop git commit: MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between copySucceeded() in one thread and copyFailed() in another thread on the same host. Contributed by Junping Du.

MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between copySucceeded() in one thread and copyFailed() in another thread on the same host. Contributed by Junping Du.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f4e2b3cc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f4e2b3cc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f4e2b3cc

Branch: refs/heads/HDFS-7240
Commit: f4e2b3cc0b1f4e49c306bc09a9dddd0495225bb2
Parents: 6d5da94
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Wed May 13 00:28:17 2015 +0900
Committer: Tsuyoshi Ozawa <oz...@apache.org>
Committed: Wed May 13 00:28:17 2015 +0900

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  4 ++
 .../task/reduce/ShuffleSchedulerImpl.java       | 14 +++-
 .../task/reduce/TestShuffleScheduler.java       | 70 ++++++++++++++++++++
 3 files changed, 85 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4e2b3cc/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index ca92a97..15cdf90 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -414,6 +414,10 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6360. TestMapreduceConfigFields is placed in wrong dir, 
     introducing compile error (Arshad Mohammad via vinayakumarb)
 
+    MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between
+    copySucceeded() in one thread and copyFailed() in another thread on the
+    same host. (Junping Du via ozawa)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4e2b3cc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
index 8317672..ff0bb4f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
@@ -239,7 +239,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
   }
   
   private void updateStatus() {
-    updateStatus(null);	
+    updateStatus(null);
   }
 
   public synchronized void hostFailed(String hostname) {
@@ -263,9 +263,17 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
       failureCounts.put(mapId, new IntWritable(1));
     }
     String hostname = host.getHostName();
+    IntWritable hostFailedNum = hostFailures.get(hostname);
+    // MAPREDUCE-6361: hostname could get cleanup from hostFailures in another
+    // thread with copySucceeded.
+    // In this case, add back hostname to hostFailures to get rid of NPE issue.
+    if (hostFailedNum == null) {
+      hostFailures.put(hostname, new IntWritable(1));
+    }
     //report failure if already retried maxHostFailures times
-    boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false;
-    
+    boolean hostFail = hostFailures.get(hostname).get() >
+        getMaxHostFailures() ? true : false;
+
     if (failures >= abortFailureLimit) {
       try {
         throw new IOException(failures + " failures downloading " + mapId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4e2b3cc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
index 6ac2320..654b748 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
@@ -213,6 +213,76 @@ public class TestShuffleScheduler {
     Assert.assertEquals(copyMessage(10, 1, 2), progress.toString());
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test
+  public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
+    JobConf job = new JobConf();
+    job.setNumMapTasks(2);
+    //mock creation
+    TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+    Reporter mockReporter = mock(Reporter.class);
+    FileSystem mockFileSystem = mock(FileSystem.class);
+    Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = job.getCombinerClass();
+    @SuppressWarnings("unchecked")  // needed for mock with generic
+    CombineOutputCollector<K, V>  mockCombineOutputCollector =
+        (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
+    org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
+        mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
+    LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+    CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
+    Counter mockCounter = mock(Counter.class);
+    TaskStatus mockTaskStatus = mock(TaskStatus.class);
+    Progress mockProgress = mock(Progress.class);
+    MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
+    Task mockTask = mock(Task.class);
+    @SuppressWarnings("unchecked")
+    MapOutput<K, V> output = mock(MapOutput.class);
+
+    ShuffleConsumerPlugin.Context<K, V> context =
+        new ShuffleConsumerPlugin.Context<K, V>(
+            mockTaskAttemptID, job, mockFileSystem,
+            mockUmbilical, mockLocalDirAllocator,
+            mockReporter, mockCompressionCodec,
+            combinerClass, mockCombineOutputCollector,
+            mockCounter, mockCounter, mockCounter,
+            mockCounter, mockCounter, mockCounter,
+            mockTaskStatus, mockProgress, mockProgress,
+            mockTask, mockMapOutputFile, null);
+    TaskStatus status = new TaskStatus() {
+      @Override
+      public boolean getIsMap() {
+        return false;
+      }
+      @Override
+      public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+      }
+    };
+    Progress progress = new Progress();
+    ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
+        status, null, null, progress, context.getShuffledMapsCounter(),
+        context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+
+    MapHost host1 = new MapHost("host1", null);
+    TaskAttemptID failedAttemptID = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 0), 0);
+
+    TaskAttemptID succeedAttemptID = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 1), 1);
+
+    // handle output fetch failure for failedAttemptID, part I
+    scheduler.hostFailed(host1.getHostName());
+
+    // handle output fetch succeed for succeedAttemptID
+    long bytes = (long)500 * 1024 * 1024;
+    scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);
+
+    // handle output fetch failure for failedAttemptID, part II
+    // for MAPREDUCE-6361: verify no NPE exception get thrown out
+    scheduler.copyFailed(failedAttemptID, host1, true, false);
+  }
+
   private static String copyMessage(int attemptNo, double rate1, double rate2) {
     int attemptZero = attemptNo - 1;
     return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"