You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/11/09 16:09:42 UTC

[GitHub] [hadoop] K0K0V0K commented on a diff in pull request #3259: HADOOP-15327. Upgrade MR ShuffleHandler to use Netty4

K0K0V0K commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r1018076397


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -182,19 +184,29 @@ public class ShuffleHandler extends AuxiliaryService {
 
   public static final HttpResponseStatus TOO_MANY_REQ_STATUS =
       new HttpResponseStatus(429, "TOO MANY REQUESTS");
-  // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
+  // This should be kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
   public static final long FETCH_RETRY_DELAY = 1000L;
   public static final String RETRY_AFTER_HEADER = "Retry-After";
+  static final String ENCODER_HANDLER_NAME = "encoder";
 
   private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
+  private ServerBootstrap bootstrap;
+  private Channel ch;
+  private final ChannelGroup accepted =
+      new DefaultChannelGroup(new DefaultEventExecutorGroup(5).next());

Review Comment:
   Maybe there can be a line comment why we have to create 5 event executor



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java:
##########
@@ -72,19 +73,21 @@
   private static final String FETCH_RETRY_AFTER_HEADER = "Retry-After";
 
   protected final Reporter reporter;
-  private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
+  @VisibleForTesting
+  public enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
-  
-  private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+
+  @VisibleForTesting
+  public final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";

Review Comment:
   check style wont cry for public static final instead of public final static?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,65 +990,84 @@ private List<String> splitMaps(List<String> mapq) {
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) 
+    public void channelActive(ChannelHandlerContext ctx)
         throws Exception {
-      super.channelOpen(ctx, evt);
-
-      if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
+      NettyChannelHelper.channelActive(ctx.channel());
+      int numConnections = activeConnections.incrementAndGet();
+      if ((maxShuffleConnections > 0) && (numConnections > maxShuffleConnections)) {
         LOG.info(String.format("Current number of shuffle connections (%d) is " + 
-            "greater than or equal to the max allowed shuffle connections (%d)", 
+            "greater than the max allowed shuffle connections (%d)",
             accepted.size(), maxShuffleConnections));
 
-        Map<String, String> headers = new HashMap<String, String>(1);
+        Map<String, String> headers = new HashMap<>(1);
         // notify fetchers to backoff for a while before closing the connection
         // if the shuffle connection limit is hit. Fetchers are expected to
         // handle this notification gracefully, that is, not treating this as a
         // fetch failure.
         headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
         sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
-        return;
+      } else {
+        super.channelActive(ctx);
+        accepted.add(ctx.channel());
+        LOG.debug("Added channel: {}, channel id: {}. Accepted number of connections={}",
+            ctx.channel(), ctx.channel().id(), activeConnections.get());
       }
-      accepted.add(evt.getChannel());
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      NettyChannelHelper.channelInactive(ctx.channel());
+      super.channelInactive(ctx);
+      int noOfConnections = activeConnections.decrementAndGet();
+      LOG.debug("New value of Accepted number of connections={}", noOfConnections);
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
-      HttpRequest request = (HttpRequest) evt.getMessage();
-      if (request.getMethod() != GET) {
-          sendError(ctx, METHOD_NOT_ALLOWED);
-          return;
+      Channel channel = ctx.channel();
+      LOG.trace("Executing channelRead, channel id: {}", channel.id());
+      HttpRequest request = (HttpRequest) msg;
+      LOG.debug("Received HTTP request: {}, channel id: {}", request, channel.id());
+      if (request.method() != GET) {
+        sendError(ctx, METHOD_NOT_ALLOWED);
+        return;
       }
       // Check whether the shuffle version is compatible
-      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
-          request.headers() != null ?
-              request.headers().get(ShuffleHeader.HTTP_HEADER_NAME) : null)
-          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
-              request.headers() != null ?
-                  request.headers()
-                      .get(ShuffleHeader.HTTP_HEADER_VERSION) : null)) {
+      String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+      String httpHeaderName = ShuffleHeader.HTTP_HEADER_NAME;

Review Comment:
   This should not be DEFAULT_HTTP_HEADER_NAME ?
   (also do we need the if from the #1045 line?)



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties:
##########
@@ -17,3 +17,5 @@ log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
+log4j.logger.io.netty=DEBUG
+log4j.logger.org.apache.hadoop.mapred=DEBUG

Review Comment:
   This wont slow down the build processes too much?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -291,36 +302,86 @@ public void operationComplete(ChannelFuture future) throws Exception {
     }
   }
 
+  static class NettyChannelHelper {
+    static ChannelFuture writeToChannel(Channel ch, Object obj) {
+      LOG.debug("Writing {} to channel: {}", obj.getClass().getSimpleName(), ch.id());
+      return ch.writeAndFlush(obj);
+    }
+
+    static ChannelFuture writeToChannelAndClose(Channel ch, Object obj) {
+      return writeToChannel(ch, obj).addListener(ChannelFutureListener.CLOSE);
+    }
+
+    static ChannelFuture writeToChannelAndAddLastHttpContent(Channel ch, HttpResponse obj) {
+      writeToChannel(ch, obj);
+      return writeLastHttpContentToChannel(ch);
+    }
+
+    static ChannelFuture writeLastHttpContentToChannel(Channel ch) {
+      LOG.debug("Writing LastHttpContent, channel id: {}", ch.id());
+      return ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+    }
+
+    static ChannelFuture closeChannel(Channel ch) {
+      LOG.debug("Closing channel, channel id: {}", ch.id());
+      return ch.close();
+    }
+
+    static void closeChannels(ChannelGroup channelGroup) {
+      channelGroup.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+    }
+
+    public static ChannelFuture closeAsIdle(Channel channel, int timeout) {
+      LOG.debug("Closing channel as writer was idle for {} seconds", timeout);
+      return closeChannel(channel);
+    }
+
+    public static void channelActive(Channel ch) {
+      LOG.debug("Executing channelActive, channel id: {}", ch.id());
+    }
+
+    public static void channelInactive(Channel channel) {
+      LOG.debug("Executing channelInactive, channel id: {}", channel.id());
+    }

Review Comment:
   do we need public keyword here?
   ( if no and we change these lines maybe the channelActive can be renamed to logChannelActive, same for inactive, and the channel parameter can be renamed to ch. )



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -668,34 +1357,61 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
       conns[i].connect();
     }
 
-    //Ensure first connections are okay
-    conns[0].getInputStream();
-    int rc = conns[0].getResponseCode();
-    Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
-    
-    conns[1].getInputStream();
-    rc = conns[1].getResponseCode();
-    Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
-
-    // This connection should be closed because it to above the limit
-    try {
-      rc = conns[2].getResponseCode();
-      Assert.assertEquals("Expected a too-many-requests response code",
-          ShuffleHandler.TOO_MANY_REQ_STATUS.getCode(), rc);
-      long backoff = Long.valueOf(
-          conns[2].getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER));
-      Assert.assertTrue("The backoff value cannot be negative.", backoff > 0);
-      conns[2].getInputStream();
-      Assert.fail("Expected an IOException");
-    } catch (IOException ioe) {
-      LOG.info("Expected - connection should not be open");
-    } catch (NumberFormatException ne) {
-      Assert.fail("Expected a numerical value for RETRY_AFTER header field");
-    } catch (Exception e) {
-      Assert.fail("Expected a IOException");
+    Map<Integer, List<HttpURLConnection>> mapOfConnections = Maps.newHashMap();
+    for (HttpURLConnection conn : conns) {
+      try {
+        conn.getInputStream();
+      } catch (IOException ioe) {
+        LOG.info("Expected - connection should not be open");
+      } catch (NumberFormatException ne) {
+        fail("Expected a numerical value for RETRY_AFTER header field");
+      } catch (Exception e) {
+        fail("Expected a IOException");
+      }
+      int statusCode = conn.getResponseCode();
+      LOG.debug("Connection status code: {}", statusCode);
+      mapOfConnections.putIfAbsent(statusCode, new ArrayList<>());
+      List<HttpURLConnection> connectionList = mapOfConnections.get(statusCode);
+      connectionList.add(conn);
     }
+
+    assertEquals(String.format("Expected only %s and %s response",
+            OK_STATUS, ShuffleHandler.TOO_MANY_REQ_STATUS),
+        Sets.newHashSet(
+            HttpURLConnection.HTTP_OK,
+            ShuffleHandler.TOO_MANY_REQ_STATUS.code()),
+        mapOfConnections.keySet());
     
-    shuffleHandler.stop(); 
+    List<HttpURLConnection> successfulConnections =
+        mapOfConnections.get(HttpURLConnection.HTTP_OK);
+    assertEquals(String.format("Expected exactly %d requests " +
+            "with %s response", maxAllowedConnections, OK_STATUS),
+        maxAllowedConnections, successfulConnections.size());
+
+    //Ensure exactly one connection is HTTP 429 (TOO MANY REQUESTS)
+    List<HttpURLConnection> closedConnections =
+        mapOfConnections.get(ShuffleHandler.TOO_MANY_REQ_STATUS.code());
+    assertEquals(String.format("Expected exactly %d %s response",
+            notAcceptedConnections, ShuffleHandler.TOO_MANY_REQ_STATUS),
+        notAcceptedConnections, closedConnections.size());
+
+    // This connection should be closed because it is above the maximum limit
+    HttpURLConnection conn = closedConnections.get(0);
+    assertEquals(String.format("Expected a %s response",
+            ShuffleHandler.TOO_MANY_REQ_STATUS),
+        ShuffleHandler.TOO_MANY_REQ_STATUS.code(), conn.getResponseCode());
+    long backoff = Long.parseLong(
+        conn.getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER));
+    assertTrue("The backoff value cannot be negative.", backoff > 0);
+
+    shuffleHandler.stop();
+
+    //It's okay to get a ClosedChannelException.
+    //All other kinds of exceptions means something went wrong
+    assertEquals("Should have no caught exceptions",
+        Collections.emptyList(), failures.stream()
+            .filter(f -> !(f instanceof ClosedChannelException))
+            .collect(toList()));

Review Comment:
   Maybe here can be a clean up where we call the close method for every channel, to ensure we dont allocate unused resources? Or that would be too much?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org