You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/10/26 18:30:37 UTC

[11/50] [abbrv] hadoop git commit: MAPREDUCE-6728. Give fetchers hint when ShuffleHandler rejects a shuffling connection (haibochen via rkanter)

MAPREDUCE-6728. Give fetchers hint when ShuffleHandler rejects a shuffling connection (haibochen via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d4725bfc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d4725bfc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d4725bfc

Branch: refs/heads/YARN-4752
Commit: d4725bfcb2d300219d65395a78f957afbf37b201
Parents: c473490
Author: Robert Kanter <rk...@apache.org>
Authored: Fri Oct 21 17:46:17 2016 -0700
Committer: Robert Kanter <rk...@apache.org>
Committed: Fri Oct 21 17:46:17 2016 -0700

----------------------------------------------------------------------
 .../hadoop/mapreduce/task/reduce/Fetcher.java   | 36 ++++++++++++++++
 .../hadoop/mapreduce/task/reduce/MapHost.java   |  4 --
 .../task/reduce/ShuffleSchedulerImpl.java       | 43 ++++++++++++++++----
 .../mapreduce/task/reduce/TestFetcher.java      | 22 ++++++++++
 .../apache/hadoop/mapred/ShuffleHandler.java    | 27 ++++++++++--
 .../hadoop/mapred/TestShuffleHandler.java       | 17 +++++---
 6 files changed, 126 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4725bfc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index be2f84f..c6889cb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -65,6 +65,11 @@ class Fetcher<K,V> extends Thread {
   /* Default read timeout (in milliseconds) */
   private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
 
+  // This should be kept in sync with ShuffleHandler.FETCH_RETRY_DELAY.
+  private static final long FETCH_RETRY_DELAY_DEFAULT = 1000L;
+  static final int TOO_MANY_REQ_STATUS_CODE = 429;
+  private static final String FETCH_RETRY_AFTER_HEADER = "Retry-After";
+
   protected final Reporter reporter;
   private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
@@ -269,6 +274,13 @@ class Fetcher<K,V> extends Thread {
       } else {
         input = new DataInputStream(connection.getInputStream());
       }
+    } catch (TryAgainLaterException te) {
+      LOG.warn("Connection rejected by the host " + te.host +
+          ". Will retry later.");
+      scheduler.penalize(host, te.backoff);
+      for (TaskAttemptID left : remaining) {
+        scheduler.putBackKnownMapOutput(host, left);
+      }
     } catch (IOException ie) {
       boolean connectExcpt = ie instanceof ConnectException;
       ioErrs.increment(1);
@@ -427,6 +439,19 @@ class Fetcher<K,V> extends Thread {
       throws IOException {
     // Validate response code
     int rc = connection.getResponseCode();
+    // See if the shuffleHandler rejected the connection due to too many
+    // reducer requests. If so, signal fetchers to back off.
+    if (rc == TOO_MANY_REQ_STATUS_CODE) {
+      long backoff = connection.getHeaderFieldLong(FETCH_RETRY_AFTER_HEADER,
+          FETCH_RETRY_DELAY_DEFAULT);
+      // in case we get a negative backoff from ShuffleHandler
+      if (backoff < 0) {
+        backoff = FETCH_RETRY_DELAY_DEFAULT;
+        LOG.warn("Get a negative backoff value from ShuffleHandler. Setting" +
+            " it to the default value " + FETCH_RETRY_DELAY_DEFAULT);
+      }
+      throw new TryAgainLaterException(backoff, url.getHost());
+    }
     if (rc != HttpURLConnection.HTTP_OK) {
       throw new IOException(
           "Got invalid response code " + rc + " from " + url +
@@ -728,4 +753,15 @@ class Fetcher<K,V> extends Thread {
       }
     }
   }
+
+  private static class TryAgainLaterException extends IOException {
+    public final long backoff;
+    public final String host;
+
+    public TryAgainLaterException(long backoff, String host) {
+      super("Too many requests to a map host");
+      this.backoff = backoff;
+      this.host = host;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4725bfc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
index 935931d..dfb28de 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
@@ -75,10 +75,6 @@ public class MapHost {
     state = State.BUSY;
   }
   
-  public synchronized void markPenalized() {
-    state = State.PENALIZED;
-  }
-  
   public synchronized int getNumKnownMapOutputs() {
     return maps.size();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4725bfc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
index c0d7e0f..a819771 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
@@ -35,6 +35,7 @@ import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -105,7 +106,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
   private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
 
   private final boolean reportReadErrorImmediately;
-  private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
+  private long maxPenalty = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
   private int maxHostFailures;
 
   public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
@@ -136,7 +137,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
     this.reportReadErrorImmediately = job.getBoolean(
         MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
 
-    this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
+    this.maxPenalty = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
         MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
     this.maxHostFailures = job.getInt(
         MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
@@ -252,9 +253,26 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
     }
   }
 
+  @VisibleForTesting
+  synchronized int hostFailureCount(String hostname) {
+    int failures = 0;
+    if (hostFailures.containsKey(hostname)) {
+      failures = hostFailures.get(hostname).get();
+    }
+    return failures;
+  }
+
+  @VisibleForTesting
+  synchronized int fetchFailureCount(TaskAttemptID mapId) {
+    int failures = 0;
+    if (failureCounts.containsKey(mapId)) {
+      failures = failureCounts.get(mapId).get();
+    }
+    return failures;
+  }
+
   public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
       boolean readError, boolean connectExcpt) {
-    host.penalize();
     int failures = 1;
     if (failureCounts.containsKey(mapId)) {
       IntWritable x = failureCounts.get(mapId);
@@ -290,15 +308,22 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
 
     long delay = (long) (INITIAL_PENALTY *
         Math.pow(PENALTY_GROWTH_RATE, failures));
-    if (delay > maxDelay) {
-      delay = maxDelay;
-    }
-
-    penalties.add(new Penalty(host, delay));
+    penalize(host, Math.min(delay, maxPenalty));
 
     failedShuffleCounter.increment(1);
   }
-  
+
+  /**
+   * Ask the shuffle scheduler to penalize a given host for a given amount
+   * of time before it reassigns a new fetcher to fetch from the host.
+   * @param host The host to penalize.
+   * @param delay The time to wait for before retrying
+   */
+  void penalize(MapHost host, long delay) {
+    host.penalize();
+    penalties.add(new Penalty(host, delay));
+  }
+
   public void reportLocalError(IOException ioe) {
     try {
       LOG.error("Shuffle failed : local error on this node: "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4725bfc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 998b3de..01e51e9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TestName;
@@ -176,6 +177,27 @@ public class TestFetcher {
     verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
     verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
   }
+
+  @Test
+  public void testCopyFromHostConnectionRejected() throws Exception {
+    when(connection.getResponseCode())
+        .thenReturn(Fetcher.TOO_MANY_REQ_STATUS_CODE);
+
+    Fetcher<Text, Text> fetcher = new FakeFetcher<>(job, id, ss, mm, r, metrics,
+        except, key, connection);
+    fetcher.copyFromHost(host);
+
+    Assert.assertEquals("No host failure is expected.",
+        ss.hostFailureCount(host.getHostName()), 0);
+    Assert.assertEquals("No fetch failure is expected.",
+        ss.fetchFailureCount(map1ID), 0);
+    Assert.assertEquals("No fetch failure is expected.",
+        ss.fetchFailureCount(map2ID), 0);
+
+    verify(ss).penalize(eq(host), anyLong());
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
+  }
   
   @Test
   public void testCopyFromHostBogusHeader() throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4725bfc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index 558ee38..4c18709 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -92,7 +92,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.fusesource.leveldbjni.internal.NativeDB;
 import org.iq80.leveldb.DB;
@@ -166,6 +165,12 @@ public class ShuffleHandler extends AuxiliaryService {
   private static final String DATA_FILE_NAME = "file.out";
   private static final String INDEX_FILE_NAME = "file.out.index";
 
+  public static final HttpResponseStatus TOO_MANY_REQ_STATUS =
+      new HttpResponseStatus(429, "TOO MANY REQUESTS");
+  // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
+  public static final long FETCH_RETRY_DELAY = 1000L;
+  public static final String RETRY_AFTER_HEADER = "Retry-After";
+
   private int port;
   private ChannelFactory selector;
   private final ChannelGroup accepted = new DefaultChannelGroup();
@@ -795,7 +800,6 @@ public class ShuffleHandler extends AuxiliaryService {
   }
 
   class Shuffle extends SimpleChannelUpstreamHandler {
-
     private static final int MAX_WEIGHT = 10 * 1024 * 1024;
     private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5;
     private static final int ALLOWED_CONCURRENCY = 16;
@@ -875,7 +879,14 @@ public class ShuffleHandler extends AuxiliaryService {
         LOG.info(String.format("Current number of shuffle connections (%d) is " + 
             "greater than or equal to the max allowed shuffle connections (%d)", 
             accepted.size(), maxShuffleConnections));
-        evt.getChannel().close();
+
+        Map<String, String> headers = new HashMap<String, String>(1);
+        // notify fetchers to backoff for a while before closing the connection
+        // if the shuffle connection limit is hit. Fetchers are expected to
+        // handle this notification gracefully, that is, not treating this as a
+        // fetch failure.
+        headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
+        sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
         return;
       }
       accepted.add(evt.getChannel());
@@ -1245,6 +1256,11 @@ public class ShuffleHandler extends AuxiliaryService {
 
     protected void sendError(ChannelHandlerContext ctx, String message,
         HttpResponseStatus status) {
+      sendError(ctx, message, status, Collections.<String, String>emptyMap());
+    }
+
+    protected void sendError(ChannelHandlerContext ctx, String msg,
+        HttpResponseStatus status, Map<String, String> headers) {
       HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
       response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
       // Put shuffle version into http header
@@ -1252,8 +1268,11 @@ public class ShuffleHandler extends AuxiliaryService {
           ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
       response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
           ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      for (Map.Entry<String, String> header : headers.entrySet()) {
+        response.headers().set(header.getKey(), header.getValue());
+      }
       response.setContent(
-        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+          ChannelBuffers.copiedBuffer(msg, CharsetUtil.UTF_8));
 
       // Close the connection as soon as the error message is sent.
       ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4725bfc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 1717588..a927bf4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -36,7 +36,6 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.HttpURLConnection;
-import java.net.SocketException;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -80,7 +79,6 @@ import org.apache.hadoop.yarn.server.records.Version;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.AbstractChannel;
@@ -609,13 +607,20 @@ public class TestShuffleHandler {
 
     // This connection should be closed because it to above the limit
     try {
-      conns[2].getInputStream();
       rc = conns[2].getResponseCode();
-      Assert.fail("Expected a SocketException");
-    } catch (SocketException se) {
+      Assert.assertEquals("Expected a too-many-requests response code",
+          ShuffleHandler.TOO_MANY_REQ_STATUS.getCode(), rc);
+      long backoff = Long.valueOf(
+          conns[2].getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER));
+      Assert.assertTrue("The backoff value cannot be negative.", backoff > 0);
+      conns[2].getInputStream();
+      Assert.fail("Expected an IOException");
+    } catch (IOException ioe) {
       LOG.info("Expected - connection should not be open");
+    } catch (NumberFormatException ne) {
+      Assert.fail("Expected a numerical value for RETRY_AFTER header field");
     } catch (Exception e) {
-      Assert.fail("Expected a SocketException");
+      Assert.fail("Expected a IOException");
     }
     
     shuffleHandler.stop(); 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org