You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2017/09/13 22:22:11 UTC

hadoop git commit: MAPREDUCE-6957. shuffle hangs after a node manager connection timeout. Contributed by Jooseong Kim

Repository: hadoop
Updated Branches:
  refs/heads/trunk f153e6057 -> 4d98936ee


MAPREDUCE-6957. shuffle hangs after a node manager connection timeout. Contributed by Jooseong Kim


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d98936e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d98936e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d98936e

Branch: refs/heads/trunk
Commit: 4d98936eec1b5d196053426c70d455cf8f83f84f
Parents: f153e60
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Sep 13 17:21:13 2017 -0500
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Sep 13 17:21:13 2017 -0500

----------------------------------------------------------------------
 .../hadoop/mapreduce/task/reduce/Fetcher.java   | 18 ++---
 .../task/reduce/ShuffleSchedulerImpl.java       |  3 +
 .../mapreduce/task/reduce/TestFetcher.java      |  5 +-
 .../task/reduce/TestShuffleScheduler.java       | 80 ++++++++++++++++++++
 4 files changed, 93 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d98936e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 6112fb5..ce1551b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -278,9 +278,6 @@ class Fetcher<K,V> extends Thread {
       LOG.warn("Connection rejected by the host " + te.host +
           ". Will retry later.");
       scheduler.penalize(host, te.backoff);
-      for (TaskAttemptID left : remaining) {
-        scheduler.putBackKnownMapOutput(host, left);
-      }
     } catch (IOException ie) {
       boolean connectExcpt = ie instanceof ConnectException;
       ioErrs.increment(1);
@@ -293,11 +290,6 @@ class Fetcher<K,V> extends Thread {
       for(TaskAttemptID left: remaining) {
         scheduler.copyFailed(left, host, false, connectExcpt);
       }
-
-      // Add back all the remaining maps, WITHOUT marking them as failed
-      for(TaskAttemptID left: remaining) {
-        scheduler.putBackKnownMapOutput(host, left);
-      }
     }
 
     return input;
@@ -332,12 +324,14 @@ class Fetcher<K,V> extends Thread {
     
     // Construct the url and connect
     URL url = getMapOutputURL(host, maps);
-    DataInputStream input = openShuffleUrl(host, remaining, url);
-    if (input == null) {
-      return;
-    }
+    DataInputStream input = null;
     
     try {
+      input = openShuffleUrl(host, remaining, url);
+      if (input == null) {
+        return;
+      }
+
       // Loop through available map-outputs and fetch them
       // On any error, faildTasks is not null and we exit
       // after putting back the remaining maps to the 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d98936e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
index 2b6dc57..d9ce32c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
@@ -217,6 +217,9 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
       reduceShuffleBytes.increment(bytes);
       lastProgressTime = Time.monotonicNow();
       LOG.debug("map " + mapId + " done " + status.getStateString());
+    } else {
+      LOG.warn("Aborting already-finished MapOutput for " + mapId);
+      output.abort();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d98936e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 8f9434d..934afd7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -471,7 +471,10 @@ public class TestFetcher {
 
     underTest.copyFromHost(host);
     verify(allErrs).increment(1);
-    verify(ss).copyFailed(map1ID, host, false, false);
+    verify(ss, times(1)).copyFailed(map1ID, host, false, false);
+    verify(ss, times(1)).copyFailed(map2ID, host, false, false);
+    verify(ss, times(1)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
+    verify(ss, times(1)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d98936e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
index 654b748..5d0a027 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -283,6 +285,84 @@ public class TestShuffleScheduler {
     scheduler.copyFailed(failedAttemptID, host1, true, false);
   }
 
+  @Test
+  public <K, V> void testDuplicateCopySucceeded() throws Exception {
+    JobConf job = new JobConf();
+    job.setNumMapTasks(2);
+    //mock creation
+    TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+    Reporter mockReporter = mock(Reporter.class);
+    FileSystem mockFileSystem = mock(FileSystem.class);
+    Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass =
+        job.getCombinerClass();
+    @SuppressWarnings("unchecked")  // needed for mock with generic
+        CombineOutputCollector<K, V>  mockCombineOutputCollector =
+        (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
+    org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
+        mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
+    LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+    CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
+    Counter mockCounter = mock(Counter.class);
+    TaskStatus mockTaskStatus = mock(TaskStatus.class);
+    Progress mockProgress = mock(Progress.class);
+    MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
+    Task mockTask = mock(Task.class);
+    @SuppressWarnings("unchecked")
+    MapOutput<K, V> output1 = mock(MapOutput.class);
+    @SuppressWarnings("unchecked")
+    MapOutput<K, V> output2 = mock(MapOutput.class);
+    @SuppressWarnings("unchecked")
+    MapOutput<K, V> output3 = mock(MapOutput.class);
+
+    ShuffleConsumerPlugin.Context<K, V> context =
+        new ShuffleConsumerPlugin.Context<K, V>(
+            mockTaskAttemptID, job, mockFileSystem,
+            mockUmbilical, mockLocalDirAllocator,
+            mockReporter, mockCompressionCodec,
+            combinerClass, mockCombineOutputCollector,
+            mockCounter, mockCounter, mockCounter,
+            mockCounter, mockCounter, mockCounter,
+            mockTaskStatus, mockProgress, mockProgress,
+            mockTask, mockMapOutputFile, null);
+    TaskStatus status = new TaskStatus() {
+      @Override
+      public boolean getIsMap() {
+        return false;
+      }
+      @Override
+      public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+      }
+    };
+    Progress progress = new Progress();
+    ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
+        status, null, null, progress, context.getShuffledMapsCounter(),
+        context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+
+    MapHost host1 = new MapHost("host1", null);
+    TaskAttemptID succeedAttempt1ID = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+            new JobID("test", 0), TaskType.MAP, 0), 0);
+    TaskAttemptID succeedAttempt2ID = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+            new JobID("test", 0), TaskType.MAP, 0), 1);
+    TaskAttemptID succeedAttempt3ID = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+            new JobID("test", 0), TaskType.MAP, 1), 0);
+
+    long bytes = (long)500 * 1024 * 1024;
+    //First successful copy for map 0 should commit output
+    scheduler.copySucceeded(succeedAttempt1ID, host1, bytes, 0, 1, output1);
+    verify(output1).commit();
+
+    //Second successful copy for map 0 should abort output
+    scheduler.copySucceeded(succeedAttempt2ID, host1, bytes, 0, 1, output2);
+    verify(output2).abort();
+
+    //First successful copy for map 1 should commit output
+    scheduler.copySucceeded(succeedAttempt3ID, host1, bytes, 0, 1, output3);
+    verify(output3).commit();
+  }
+
   private static String copyMessage(int attemptNo, double rate1, double rate2) {
     int attemptZero = attemptNo - 1;
     return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org