You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/04/05 15:08:00 UTC
[jira] [Work logged] (HADOOP-15327) Upgrade MR ShuffleHandler to use Netty4
[ https://issues.apache.org/jira/browse/HADOOP-15327?focusedWorklogId=752939&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-752939 ]
ASF GitHub Bot logged work on HADOOP-15327:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Apr/22 15:07
Start Date: 05/Apr/22 15:07
Worklog Time Spent: 10m
Work Description: 9uapaw commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r842824343
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -1081,6 +1200,9 @@ public ChannelFuture sendMap(ReduceContext reduceContext)
return null;
}
}
+ if (nextMap == null) {
+ LOG.trace("Returning nextMap: null");
+ }
Review Comment:
I think its superflous, because in the try clause null check was already in place, and in the exception clause, we already return with null.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -294,33 +941,31 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
Channel ch, String user, String mapId, int reduce,
MapOutputInfo info)
throws IOException {
- // send a shuffle header and a lot of data down the channel
- // to trigger a broken pipe
ShuffleHeader header =
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i = 0; i < 100000; ++i) {
header.write(dob);
}
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
@Override
protected void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
if (failures.size() == 0) {
Review Comment:
This failure is not a closure on the local variable, but its the reference to the member variable of ShuffleHandlerForTestst, hence the local variable will always be an empty list.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,11 +991,11 @@ public void setPort(int port) {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
- super.channelOpen(ctx, evt);
-
- if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
+ NettyChannelHelper.channelActive(ctx.channel());
+ int numConnections = acceptedConnections.incrementAndGet();
Review Comment:
I think an increment action is not right here. Suppose you have maxShuffleConnections as 5, and there are 4 accepted connections. If you invoke this method, acceptedConnections will be 5, and the error handling part will kick in, regardless of the fact that it should be allowed.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -785,37 +846,54 @@ private void removeJobShuffleInfo(JobID jobId) throws IOException {
}
}
- static class TimeoutHandler extends IdleStateAwareChannelHandler {
+ @VisibleForTesting
+ public void setUseOutboundExceptionHandler(boolean useHandler) {
+ this.useOutboundExceptionHandler = useHandler;
+ }
+ static class TimeoutHandler extends IdleStateHandler {
+ private final int connectionKeepAliveTimeOut;
private boolean enabledTimeout;
+ public TimeoutHandler(int connectionKeepAliveTimeOut) {
+ //disable reader timeout
+ //set writer timeout to configured timeout value
+ //disable all idle timeout
+ super(0, connectionKeepAliveTimeOut, 0, TimeUnit.SECONDS);
+ this.connectionKeepAliveTimeOut = connectionKeepAliveTimeOut;
+ }
Review Comment:
Previous logic omitted every timeout. What is the default here?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -348,10 +993,13 @@ protected void sendError(ChannelHandlerContext ctx, String message,
header.readFields(input);
input.close();
+ assertEquals("sendError called when client closed connection", 0, failures.size());
+ Assert.assertEquals("Should have no caught exceptions",
+ new ArrayList<>(), failures);
Review Comment:
Use EMPTY_LIST constant here.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -185,12 +187,23 @@
// This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
public static final long FETCH_RETRY_DELAY = 1000L;
public static final String RETRY_AFTER_HEADER = "Retry-After";
+ static final String ENCODER_HANDLER_NAME = "encoder";
private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private ServerBootstrap bootstrap;
+ private Channel ch;
+ // FIXME: snemeth: need thread safety. - https://stackoverflow.com/questions/17836976/netty-4-0-instanciate-defaultchannelgroup
+ private final ChannelGroup accepted =
+ new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ private final AtomicInteger acceptedConnections = new AtomicInteger();
Review Comment:
I think it should be named activeConnections instead.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -920,49 +1007,67 @@ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
// fetch failure.
headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
- return;
+ } else {
+ super.channelActive(ctx);
Review Comment:
This super method call was actually invoked before the error handling part in the previous logic. However, I too find it better now, perhaps it was a bug in the previous logic?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -185,12 +187,23 @@
// This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
public static final long FETCH_RETRY_DELAY = 1000L;
public static final String RETRY_AFTER_HEADER = "Retry-After";
+ static final String ENCODER_HANDLER_NAME = "encoder";
private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private ServerBootstrap bootstrap;
+ private Channel ch;
+ // FIXME: snemeth: need thread safety. - https://stackoverflow.com/questions/17836976/netty-4-0-instanciate-defaultchannelgroup
+ private final ChannelGroup accepted =
+ new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
Review Comment:
This is actually threadsafe according to the documentation. However, GlobalEventExecutor instance is a non-scalable option, perhaps a custom executor will be needed here.
Issue Time Tracking
-------------------
Worklog Id: (was: 752939)
Time Spent: 8h 20m (was: 8h 10m)
> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
> Key: HADOOP-15327
> URL: https://issues.apache.org/jira/browse/HADOOP-15327
> Project: Hadoop Common
> Issue Type: Sub-task
> Reporter: Xiaoyu Yao
> Assignee: Szilard Nemeth
> Priority: Major
> Labels: pull-request-available
> Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch, HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch, HADOOP-15327.005.patch, getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log, testfailure-testMapFileAccess-emptyresponse.zip, testfailure-testReduceFromPartialMem.zip
>
> Time Spent: 8h 20m
> Remaining Estimate: 0h
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org