You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:14:05 UTC

[jira] [Resolved] (SPARK-21955) OneForOneStreamManager may leak memory when network is poor

     [ https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-21955.
----------------------------------
    Resolution: Incomplete

> OneForOneStreamManager may leak memory when network is poor
> -----------------------------------------------------------
>
>                 Key: SPARK-21955
>                 URL: https://issues.apache.org/jira/browse/SPARK-21955
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager
>    Affects Versions: 1.6.1
>         Environment: hdp 2.4.2.0-258 
> spark 1.6 
>            Reporter: poseidon
>            Priority: Major
>              Labels: bulk-closed
>         Attachments: screenshot-1.png
>
>
> just in my way to know how  stream , chunk , block works in netty found some nasty case.
> process OpenBlocks message registerStream Stream in OneForOneStreamManager
> org.apache.spark.network.server.OneForOneStreamManager#registerStream
> fill with streamState with app & buber 
> process  ChunkFetchRequest registerChannel
> org.apache.spark.network.server.OneForOneStreamManager#registerChannel
> fill with streamState with channel 
> In 
> org.apache.spark.network.shuffle.OneForOneBlockFetcher#start 
> OpenBlocks  -> ChunkFetchRequest   come in sequnce. 
> spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java
>   @Override
>   public void registerChannel(Channel channel, long streamId) {
>     if (streams.containsKey(streamId)) {
>       streams.get(streamId).associatedChannel = channel;
>     }
>   }
> this is only chance associatedChannel  is set
> public void connectionTerminated(Channel channel) {
>     // Close all streams which have been associated with the channel.
>     for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
>       StreamState state = entry.getValue();
>       if (state.associatedChannel == channel) {
>         streams.remove(entry.getKey());
>         // Release all remaining buffers.
>         while (state.buffers.hasNext()) {
>           state.buffers.next().release();
>         }
>       }
>     }
> this is only chance state.buffers is released.
> If network down in OpenBlocks  process, no more ChunkFetchRequest  message then. 
> So, channel can not be set. 
> So, we can see some leaked Buffer in OneForOneStreamManager
> !screenshot-1.png!
> if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel  is not set, then after search the code , it will remain in memory forever.
> Which may lead to OOM in NodeManager.
> Because the only way to release it was in channel close , or someone read the last piece of block. 
> OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case.
> We should set channel when  we registerStream, so buffer can be released. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org