You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Tamas Domok (Jira)" <ji...@apache.org> on 2023/01/09 15:58:00 UTC

[jira] [Created] (MAPREDUCE-7431) ShuffleHandler is not working correctly in SSL mode after the Netty 4 upgrade

Tamas Domok created MAPREDUCE-7431:
--------------------------------------

             Summary: ShuffleHandler is not working correctly in SSL mode after the Netty 4 upgrade
                 Key: MAPREDUCE-7431
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7431
             Project: Hadoop Map/Reduce
          Issue Type: Improvement
    Affects Versions: 3.4.0
            Reporter: Tamas Domok
         Attachments: sendMapPipeline.png

HADOOP-15327 introduced some regressions in the ShuffleHandler.
h3. 1. a memory leak
{code:java}
ERROR io.netty.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
{code}
 
The Shuffle's channelRead didn't release the message properly, the fix would be this:
{code:java}
      try {
        // ....
      } finally {
        ReferenceCountUtil.release(msg);
      }
{code}
Or even simpler:
{code:java}
extends SimpleChannelInboundHandler<FullHttpRequest>
{code}
h3. 1. a bug in SSL mode with more than 1 reducers

It manifested in multiple errors:
{code:java}
ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
java.io.IOException: Broken pipe

ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
java.nio.channels.ClosedChannelException

// if the reducer memory was not enough, then even this:
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#2
	at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:136)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
	at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
	at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
	at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:210)
	at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.doShuffle(InMemoryMapOutput.java:91)
{code}
*Configuration* - mapred-site.xml
{code:java}
mapreduce.shuffle.ssl.enabled=true
{code}
Alternative is to build a custom jar where *FadvisedFileRegion* is replaced with *FadvisedChunkedFile* in {*}sendMapOutput{*}.

*Reproduction*
{code:java}
hdfs dfs -rm -r -skipTrash /tmp/sort_input
hdfs dfs -rm -r -skipTrash /tmp/sort_output
yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar randomwriter "-Dmapreduce.randomwriter.totalbytes=10000000000" /tmp/sort_input
yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar sort -Dmapreduce.job.reduce.slowstart.completedmaps=1 -r 40 /tmp/sort_input /tmp/sort_output | tee sort_app_output.txt
{code}
h3. ShuffleHandler's protocol
{code:java}
// HTTP Request
GET /mapOutput?job=job_1672901779104_0001&reduce=0&map=attempt_1672901779104_0001_m_000003_0,attempt_1672901779104_0001_m_000002_0,attempt_1672901779104_0001_m_000001_0,attempt_1672901779104_0001_m_000000_0,attempt_1672901779104_0001_m_000005_0,attempt_1672901779104_0001_m_000012_0,attempt_1672901779104_0001_m_000009_0,attempt_1672901779104_0001_m_000010_0,attempt_1672901779104_0001_m_000007_0,attempt_1672901779104_0001_m_000011_0,attempt_1672901779104_0001_m_000008_0,attempt_1672901779104_0001_m_000013_0,attempt_1672901779104_0001_m_000014_0,attempt_1672901779104_0001_m_000015_0,attempt_1672901779104_0001_m_000019_0,attempt_1672901779104_0001_m_000018_0,attempt_1672901779104_0001_m_000016_0,attempt_1672901779104_0001_m_000017_0,attempt_1672901779104_0001_m_000020_0,attempt_1672901779104_0001_m_000023_0 HTTP/1.1
+ keep alive headers

// HTTP Response Headers
content-length=sum(serialised ShuffleHeader in bytes + MapOutput size)
+ keep alive headers

// Response Data (transfer-encoding=chunked)
serialised ShuffleHeader
content of the MapOutput file (start offset - length)
serialised ShuffleHeader
content of the MapOutput file (start offset - length)
serialised ShuffleHeader
content of the MapOutput file (start offset - length)
serialised ShuffleHeader
content of the MapOutput file (start offset - length)
...
LastHttpContent
// close socket if no keep-alive
{code}
h3. Issues
 - {*}setResponseHeaders{*}: did not always set the the content-length, also the transfer-encoding=chunked header was missing.
 - {*}ReduceMapFileCount.operationComplete{*}: messed up the futures on the LastHttpContent
 - {*}ChannelGroup accepted{*}: is only used to close the channels, no need for that magic 5. See the details [here|https://netty.io/4.0/api/io/netty/channel/group/ChannelGroup.html].
 - {*}bossGroup{*}: should have only 1 thread for accepting connections.
 - {*}Shuffle{*}: is unnecessarily Sharable, the 3 async sendMap / channel (see below) caused future errors when using FadvisedChunkedFile

h3. Max session open files is not an optimisation, it's actually wasting resources
{code:java}
    // by default maxSessionOpenFiles = 3
    for (int i = 0; i < Math.min(handlerCtx.maxSessionOpenFiles, mapIds.size()); i++) {
      ChannelFuture nextMap = sendMap(reduceContext);
      if(nextMap == null) {
        return;
      }
    }
{code}
!sendMapPipeline.png!

At the end of the day, we create a http chunked stream, there is no need to run 3 sendMap async, the futures will finish one-by-one sequentially. The osCache magic from the FAdvised classes won't happen either, because the first readChunk will be called only later.

So this can be simplified a lot:
{code:java}
sendMap(reduceContext);
{code}
h3. My proposal

Some refactoring: ShuffleHandler is split into multiple classes to make it possible to remove the sharable annotation.
 - ShuffleChannel
 - ShuffleChannelInitializer
 - ShuffleChannelHandlerContext
 - ShuffleChannelHandler

TODO:
 - fix/drop/refactor the existing unit tests
 - add proper unit test that tests SSL/non-SSL mode where the response data is properly verified
 - documentation about the protocol

WIP: [github.com/tomicooler/hadoop|https://github.com/tomicooler/hadoop/commit/3bc027598aea4a3b02a1997fe5d485b9a6e5c41e]
h3. Netty useful docs
 * [User guide for 4.x|https://netty.io/wiki/user-guide-for-4.x.html]
 * [New and noteworthy in 4.0|https://netty.io/wiki/new-and-noteworthy-in-4.0.html]
 * [Reference counted objects|https://netty.io/wiki/reference-counted-objects.html]   (it will be changed in [Netty 5|https://netty.io/wiki/new-and-noteworthy-in-5.0.html])
 * HttpStaticFileServer [example|https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-help@hadoop.apache.org