You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/11/09 18:12:21 UTC

[GitHub] [hadoop] brumi1024 commented on a diff in pull request #3259: HADOOP-15327. Upgrade MR ShuffleHandler to use Netty4

brumi1024 commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r1018265618


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,65 +990,84 @@ private List<String> splitMaps(List<String> mapq) {
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) 
+    public void channelActive(ChannelHandlerContext ctx)
         throws Exception {
-      super.channelOpen(ctx, evt);
-
-      if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
+      NettyChannelHelper.channelActive(ctx.channel());
+      int numConnections = activeConnections.incrementAndGet();
+      if ((maxShuffleConnections > 0) && (numConnections > maxShuffleConnections)) {
         LOG.info(String.format("Current number of shuffle connections (%d) is " + 
-            "greater than or equal to the max allowed shuffle connections (%d)", 
+            "greater than the max allowed shuffle connections (%d)",
             accepted.size(), maxShuffleConnections));
 
-        Map<String, String> headers = new HashMap<String, String>(1);
+        Map<String, String> headers = new HashMap<>(1);
         // notify fetchers to backoff for a while before closing the connection
         // if the shuffle connection limit is hit. Fetchers are expected to
         // handle this notification gracefully, that is, not treating this as a
         // fetch failure.
         headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
         sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
-        return;
+      } else {
+        super.channelActive(ctx);
+        accepted.add(ctx.channel());
+        LOG.debug("Added channel: {}, channel id: {}. Accepted number of connections={}",
+            ctx.channel(), ctx.channel().id(), activeConnections.get());
       }
-      accepted.add(evt.getChannel());
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      NettyChannelHelper.channelInactive(ctx.channel());
+      super.channelInactive(ctx);
+      int noOfConnections = activeConnections.decrementAndGet();
+      LOG.debug("New value of Accepted number of connections={}", noOfConnections);
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
-      HttpRequest request = (HttpRequest) evt.getMessage();
-      if (request.getMethod() != GET) {
-          sendError(ctx, METHOD_NOT_ALLOWED);
-          return;
+      Channel channel = ctx.channel();
+      LOG.trace("Executing channelRead, channel id: {}", channel.id());
+      HttpRequest request = (HttpRequest) msg;
+      LOG.debug("Received HTTP request: {}, channel id: {}", request, channel.id());
+      if (request.method() != GET) {
+        sendError(ctx, METHOD_NOT_ALLOWED);
+        return;
       }
       // Check whether the shuffle version is compatible
-      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
-          request.headers() != null ?
-              request.headers().get(ShuffleHeader.HTTP_HEADER_NAME) : null)
-          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
-              request.headers() != null ?
-                  request.headers()
-                      .get(ShuffleHeader.HTTP_HEADER_VERSION) : null)) {
+      String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+      String httpHeaderName = ShuffleHeader.HTTP_HEADER_NAME;

Review Comment:
   +1 on the DEFAULT, however the if is needed because if the request.headers() returns a non-null value we need to check the version.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java:
##########
@@ -72,19 +73,21 @@
   private static final String FETCH_RETRY_AFTER_HEADER = "Retry-After";
 
   protected final Reporter reporter;
-  private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
+  @VisibleForTesting
+  public enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
-  
-  private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+
+  @VisibleForTesting
+  public final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";

Review Comment:
   Changing this would mean that the others should be changed as well, and that could complicate the diff. Not sure if it's worth it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org