You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Tamas Domok (Jira)" <ji...@apache.org> on 2023/01/09 15:58:00 UTC
[jira] [Created] (MAPREDUCE-7431) ShuffleHandler is not working correctly in SSL mode after the Netty 4 upgrade
Tamas Domok created MAPREDUCE-7431:
--------------------------------------
Summary: ShuffleHandler is not working correctly in SSL mode after the Netty 4 upgrade
Key: MAPREDUCE-7431
URL: https://issues.apache.org/jira/browse/MAPREDUCE-7431
Project: Hadoop Map/Reduce
Issue Type: Improvement
Affects Versions: 3.4.0
Reporter: Tamas Domok
Attachments: sendMapPipeline.png
HADOOP-15327 introduced some regressions in the ShuffleHandler.
h3. 1. a memory leak
{code:java}
ERROR io.netty.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
{code}
The Shuffle's channelRead didn't release the message properly, the fix would be this:
{code:java}
try {
// ....
} finally {
ReferenceCountUtil.release(msg);
}
{code}
Or even simpler:
{code:java}
extends SimpleChannelInboundHandler<FullHttpRequest>
{code}
h3. 1. a bug in SSL mode with more than 1 reducers
It manifested in multiple errors:
{code:java}
ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
java.io.IOException: Broken pipe
ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
java.nio.channels.ClosedChannelException
// if the reducer memory was not enough, then even this:
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#2
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:136)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:210)
at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.doShuffle(InMemoryMapOutput.java:91)
{code}
*Configuration* - mapred-site.xml
{code:java}
mapreduce.shuffle.ssl.enabled=true
{code}
Alternative is to build a custom jar where *FadvisedFileRegion* is replaced with *FadvisedChunkedFile* in {*}sendMapOutput{*}.
*Reproduction*
{code:java}
hdfs dfs -rm -r -skipTrash /tmp/sort_input
hdfs dfs -rm -r -skipTrash /tmp/sort_output
yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar randomwriter "-Dmapreduce.randomwriter.totalbytes=10000000000" /tmp/sort_input
yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar sort -Dmapreduce.job.reduce.slowstart.completedmaps=1 -r 40 /tmp/sort_input /tmp/sort_output | tee sort_app_output.txt
{code}
h3. ShuffleHandler's protocol
{code:java}
// HTTP Request
GET /mapOutput?job=job_1672901779104_0001&reduce=0&map=attempt_1672901779104_0001_m_000003_0,attempt_1672901779104_0001_m_000002_0,attempt_1672901779104_0001_m_000001_0,attempt_1672901779104_0001_m_000000_0,attempt_1672901779104_0001_m_000005_0,attempt_1672901779104_0001_m_000012_0,attempt_1672901779104_0001_m_000009_0,attempt_1672901779104_0001_m_000010_0,attempt_1672901779104_0001_m_000007_0,attempt_1672901779104_0001_m_000011_0,attempt_1672901779104_0001_m_000008_0,attempt_1672901779104_0001_m_000013_0,attempt_1672901779104_0001_m_000014_0,attempt_1672901779104_0001_m_000015_0,attempt_1672901779104_0001_m_000019_0,attempt_1672901779104_0001_m_000018_0,attempt_1672901779104_0001_m_000016_0,attempt_1672901779104_0001_m_000017_0,attempt_1672901779104_0001_m_000020_0,attempt_1672901779104_0001_m_000023_0 HTTP/1.1
+ keep alive headers
// HTTP Response Headers
content-length=sum(serialised ShuffleHeader in bytes + MapOutput size)
+ keep alive headers
// Response Data (transfer-encoding=chunked)
serialised ShuffleHeader
content of the MapOutput file (start offset - length)
serialised ShuffleHeader
content of the MapOutput file (start offset - length)
serialised ShuffleHeader
content of the MapOutput file (start offset - length)
serialised ShuffleHeader
content of the MapOutput file (start offset - length)
...
LastHttpContent
// close socket if no keep-alive
{code}
h3. Issues
- {*}setResponseHeaders{*}: did not always set the the content-length, also the transfer-encoding=chunked header was missing.
- {*}ReduceMapFileCount.operationComplete{*}: messed up the futures on the LastHttpContent
- {*}ChannelGroup accepted{*}: is only used to close the channels, no need for that magic 5. See the details [here|https://netty.io/4.0/api/io/netty/channel/group/ChannelGroup.html].
- {*}bossGroup{*}: should have only 1 thread for accepting connections.
- {*}Shuffle{*}: is unnecessarily Sharable, the 3 async sendMap / channel (see below) caused future errors when using FadvisedChunkedFile
h3. Max session open files is not an optimisation, it's actually wasting resources
{code:java}
// by default maxSessionOpenFiles = 3
for (int i = 0; i < Math.min(handlerCtx.maxSessionOpenFiles, mapIds.size()); i++) {
ChannelFuture nextMap = sendMap(reduceContext);
if(nextMap == null) {
return;
}
}
{code}
!sendMapPipeline.png!
At the end of the day, we create a http chunked stream, there is no need to run 3 sendMap async, the futures will finish one-by-one sequentially. The osCache magic from the FAdvised classes won't happen either, because the first readChunk will be called only later.
So this can be simplified a lot:
{code:java}
sendMap(reduceContext);
{code}
h3. My proposal
Some refactoring: ShuffleHandler is split into multiple classes to make it possible to remove the sharable annotation.
- ShuffleChannel
- ShuffleChannelInitializer
- ShuffleChannelHandlerContext
- ShuffleChannelHandler
TODO:
- fix/drop/refactor the existing unit tests
- add proper unit test that tests SSL/non-SSL mode where the response data is properly verified
- documentation about the protocol
WIP: [github.com/tomicooler/hadoop|https://github.com/tomicooler/hadoop/commit/3bc027598aea4a3b02a1997fe5d485b9a6e5c41e]
h3. Netty useful docs
* [User guide for 4.x|https://netty.io/wiki/user-guide-for-4.x.html]
* [New and noteworthy in 4.0|https://netty.io/wiki/new-and-noteworthy-in-4.0.html]
* [Reference counted objects|https://netty.io/wiki/reference-counted-objects.html] (it will be changed in [Netty 5|https://netty.io/wiki/new-and-noteworthy-in-5.0.html])
* HttpStaticFileServer [example|https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-help@hadoop.apache.org