You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Szilard Nemeth (Jira)" <ji...@apache.org> on 2023/01/10 08:45:00 UTC

[jira] [Updated] (MAPREDUCE-7431) ShuffleHandler is not working correctly in SSL mode after the Netty 4 upgrade

     [ https://issues.apache.org/jira/browse/MAPREDUCE-7431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Szilard Nemeth updated MAPREDUCE-7431:
--------------------------------------
    Status: Patch Available  (was: Open)

> ShuffleHandler is not working correctly in SSL mode after the Netty 4 upgrade
> -----------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-7431
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7431
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>    Affects Versions: 3.4.0
>            Reporter: Tamas Domok
>            Priority: Major
>         Attachments: sendMapPipeline.png
>
>
> HADOOP-15327 introduced some regressions in the ShuffleHandler.
> h3. 1. a memory leak
> {code:java}
> ERROR io.netty.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
> {code}
>  
> The Shuffle's channelRead didn't release the message properly, the fix would be this:
> {code:java}
>       try {
>         // ....
>       } finally {
>         ReferenceCountUtil.release(msg);
>       }
> {code}
> Or even simpler:
> {code:java}
> extends SimpleChannelInboundHandler<FullHttpRequest>
> {code}
> h3. 1. a bug in SSL mode with more than 1 reducers
> It manifested in multiple errors:
> {code:java}
> ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
> java.io.IOException: Broken pipe
> ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
> java.nio.channels.ClosedChannelException
> // if the reducer memory was not enough, then even this:
> Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#2
> 	at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:136)
> 	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377)
> 	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> 	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
> Caused by: java.lang.OutOfMemoryError: Java heap space
> 	at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
> 	at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
> 	at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
> 	at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:210)
> 	at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.doShuffle(InMemoryMapOutput.java:91)
> {code}
> *Configuration* - mapred-site.xml
> {code:java}
> mapreduce.shuffle.ssl.enabled=true
> {code}
> Alternative is to build a custom jar where *FadvisedFileRegion* is replaced with *FadvisedChunkedFile* in {*}sendMapOutput{*}.
> *Reproduction*
> {code:java}
> hdfs dfs -rm -r -skipTrash /tmp/sort_input
> hdfs dfs -rm -r -skipTrash /tmp/sort_output
> yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar randomwriter "-Dmapreduce.randomwriter.totalbytes=10000000000" /tmp/sort_input
> yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar sort -Dmapreduce.job.reduce.slowstart.completedmaps=1 -r 40 /tmp/sort_input /tmp/sort_output | tee sort_app_output.txt
> {code}
> h3. ShuffleHandler's protocol
> {code:java}
> // HTTP Request
> GET /mapOutput?job=job_1672901779104_0001&reduce=0&map=attempt_1672901779104_0001_m_000003_0,attempt_1672901779104_0001_m_000002_0,attempt_1672901779104_0001_m_000001_0,attempt_1672901779104_0001_m_000000_0,attempt_1672901779104_0001_m_000005_0,attempt_1672901779104_0001_m_000012_0,attempt_1672901779104_0001_m_000009_0,attempt_1672901779104_0001_m_000010_0,attempt_1672901779104_0001_m_000007_0,attempt_1672901779104_0001_m_000011_0,attempt_1672901779104_0001_m_000008_0,attempt_1672901779104_0001_m_000013_0,attempt_1672901779104_0001_m_000014_0,attempt_1672901779104_0001_m_000015_0,attempt_1672901779104_0001_m_000019_0,attempt_1672901779104_0001_m_000018_0,attempt_1672901779104_0001_m_000016_0,attempt_1672901779104_0001_m_000017_0,attempt_1672901779104_0001_m_000020_0,attempt_1672901779104_0001_m_000023_0 HTTP/1.1
> + keep alive headers
> // HTTP Response Headers
> content-length=sum(serialised ShuffleHeader in bytes + MapOutput size)
> + keep alive headers
> // Response Data (transfer-encoding=chunked)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> ...
> LastHttpContent
> // close socket if no keep-alive
> {code}
> h3. Issues
>  - {*}setResponseHeaders{*}: did not always set the the content-length, also the transfer-encoding=chunked header was missing.
>  - {*}ReduceMapFileCount.operationComplete{*}: messed up the futures on the LastHttpContent
>  - {*}ChannelGroup accepted{*}: is only used to close the channels, no need for that magic 5. See the details [here|https://netty.io/4.0/api/io/netty/channel/group/ChannelGroup.html].
>  - {*}bossGroup{*}: should have only 1 thread for accepting connections.
>  - {*}Shuffle{*}: is unnecessarily Sharable, the 3 async sendMap / channel (see below) caused future errors when using FadvisedChunkedFile
> h3. Max session open files is not an optimisation, it's actually wasting resources
> {code:java}
>     // by default maxSessionOpenFiles = 3
>     for (int i = 0; i < Math.min(handlerCtx.maxSessionOpenFiles, mapIds.size()); i++) {
>       ChannelFuture nextMap = sendMap(reduceContext);
>       if(nextMap == null) {
>         return;
>       }
>     }
> {code}
> !sendMapPipeline.png!
> At the end of the day, we create a http chunked stream, there is no need to run 3 sendMap async, the futures will finish one-by-one sequentially. The osCache magic from the FAdvised classes won't happen either, because the first readChunk will be called only later.
> So this can be simplified a lot:
> {code:java}
> sendMap(reduceContext);
> {code}
> h3. My proposal
> Some refactoring: ShuffleHandler is split into multiple classes to make it possible to remove the sharable annotation.
>  - ShuffleChannel
>  - ShuffleChannelInitializer
>  - ShuffleChannelHandlerContext
>  - ShuffleChannelHandler
> TODO:
>  - fix/drop/refactor the existing unit tests
>  - add proper unit test that tests SSL/non-SSL mode where the response data is properly verified
>  - documentation about the protocol
> WIP: [github.com/tomicooler/hadoop|https://github.com/tomicooler/hadoop/commit/3bc027598aea4a3b02a1997fe5d485b9a6e5c41e]
> h3. Netty useful docs
>  * [User guide for 4.x|https://netty.io/wiki/user-guide-for-4.x.html]
>  * [New and noteworthy in 4.0|https://netty.io/wiki/new-and-noteworthy-in-4.0.html]
>  * [Reference counted objects|https://netty.io/wiki/reference-counted-objects.html]   (it will be changed in [Netty 5|https://netty.io/wiki/new-and-noteworthy-in-5.0.html])
>  * HttpStaticFileServer [example|https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-help@hadoop.apache.org