You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/04/05 15:08:00 UTC

[jira] [Work logged] (HADOOP-15327) Upgrade MR ShuffleHandler to use Netty4

     [ https://issues.apache.org/jira/browse/HADOOP-15327?focusedWorklogId=752939&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-752939 ]

ASF GitHub Bot logged work on HADOOP-15327:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Apr/22 15:07
            Start Date: 05/Apr/22 15:07
    Worklog Time Spent: 10m 
      Work Description: 9uapaw commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r842824343


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -1081,6 +1200,9 @@ public ChannelFuture sendMap(ReduceContext reduceContext)
           return null;
         }
       }
+      if (nextMap == null) {
+        LOG.trace("Returning nextMap: null");
+      }

Review Comment:
   I think its superflous, because in the try clause null check was already in place, and in the exception clause, we already return with null.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -294,33 +941,31 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
               Channel ch, String user, String mapId, int reduce,
               MapOutputInfo info)
                   throws IOException {
-            // send a shuffle header and a lot of data down the channel
-            // to trigger a broken pipe
             ShuffleHeader header =
                 new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
             DataOutputBuffer dob = new DataOutputBuffer();
             header.write(dob);
-            ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
             dob = new DataOutputBuffer();
             for (int i = 0; i < 100000; ++i) {
               header.write(dob);
             }
-            return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
           }
           @Override
           protected void sendError(ChannelHandlerContext ctx,
               HttpResponseStatus status) {
             if (failures.size() == 0) {

Review Comment:
   This failure is not a closure on the local variable, but its the reference to the member variable of ShuffleHandlerForTestst, hence the local variable will always be an empty list.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,11 +991,11 @@ public void setPort(int port) {
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) 
+    public void channelActive(ChannelHandlerContext ctx)
         throws Exception {
-      super.channelOpen(ctx, evt);
-
-      if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
+      NettyChannelHelper.channelActive(ctx.channel());
+      int numConnections = acceptedConnections.incrementAndGet();

Review Comment:
   I think an increment action is not right here. Suppose you have maxShuffleConnections as 5, and there are 4 accepted connections. If you invoke this method, acceptedConnections will be 5, and the error handling part will kick in, regardless of the fact that it should be allowed.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -785,37 +846,54 @@ private void removeJobShuffleInfo(JobID jobId) throws IOException {
     }
   }
 
-  static class TimeoutHandler extends IdleStateAwareChannelHandler {
+  @VisibleForTesting
+  public void setUseOutboundExceptionHandler(boolean useHandler) {
+    this.useOutboundExceptionHandler = useHandler;
+  }
 
+  static class TimeoutHandler extends IdleStateHandler {
+    private final int connectionKeepAliveTimeOut;
     private boolean enabledTimeout;
 
+    public TimeoutHandler(int connectionKeepAliveTimeOut) {
+      //disable reader timeout
+      //set writer timeout to configured timeout value
+      //disable all idle timeout
+      super(0, connectionKeepAliveTimeOut, 0, TimeUnit.SECONDS);
+      this.connectionKeepAliveTimeOut = connectionKeepAliveTimeOut;
+    }

Review Comment:
   Previous logic omitted every timeout. What is the default here?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -348,10 +993,13 @@ protected void sendError(ChannelHandlerContext ctx, String message,
     header.readFields(input);
     input.close();
 
+    assertEquals("sendError called when client closed connection", 0, failures.size());
+    Assert.assertEquals("Should have no caught exceptions",
+        new ArrayList<>(), failures);

Review Comment:
   Use EMPTY_LIST constant here.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -185,12 +187,23 @@
   // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
   public static final long FETCH_RETRY_DELAY = 1000L;
   public static final String RETRY_AFTER_HEADER = "Retry-After";
+  static final String ENCODER_HANDLER_NAME = "encoder";
 
   private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
+  private ServerBootstrap bootstrap;
+  private Channel ch;
+  // FIXME: snemeth: need thread safety. - https://stackoverflow.com/questions/17836976/netty-4-0-instanciate-defaultchannelgroup
+  private final ChannelGroup accepted =
+      new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+  private final AtomicInteger acceptedConnections = new AtomicInteger();

Review Comment:
   I think it should be named activeConnections instead.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -920,49 +1007,67 @@ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
         // fetch failure.
         headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
         sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
-        return;
+      } else {
+        super.channelActive(ctx);

Review Comment:
   This super method call was actually invoked before the error handling part in the previous logic. However, I too find it better now, perhaps it was a bug in the previous logic?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -185,12 +187,23 @@
   // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
   public static final long FETCH_RETRY_DELAY = 1000L;
   public static final String RETRY_AFTER_HEADER = "Retry-After";
+  static final String ENCODER_HANDLER_NAME = "encoder";
 
   private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
+  private ServerBootstrap bootstrap;
+  private Channel ch;
+  // FIXME: snemeth: need thread safety. - https://stackoverflow.com/questions/17836976/netty-4-0-instanciate-defaultchannelgroup
+  private final ChannelGroup accepted =
+      new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

Review Comment:
   This is actually threadsafe according to the documentation. However, GlobalEventExecutor instance is a non-scalable option, perhaps a custom executor will be needed here.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 752939)
    Time Spent: 8h 20m  (was: 8h 10m)

> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
>                 Key: HADOOP-15327
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15327
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Xiaoyu Yao
>            Assignee: Szilard Nemeth
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch, HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch, HADOOP-15327.005.patch, getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log, testfailure-testMapFileAccess-emptyresponse.zip, testfailure-testReduceFromPartialMem.zip
>
>          Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org