You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:14:05 UTC
[jira] [Resolved] (SPARK-21955) OneForOneStreamManager may leak
memory when network is poor
[ https://issues.apache.org/jira/browse/SPARK-21955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-21955.
----------------------------------
Resolution: Incomplete
> OneForOneStreamManager may leak memory when network is poor
> -----------------------------------------------------------
>
> Key: SPARK-21955
> URL: https://issues.apache.org/jira/browse/SPARK-21955
> Project: Spark
> Issue Type: Bug
> Components: Block Manager
> Affects Versions: 1.6.1
> Environment: hdp 2.4.2.0-258
> spark 1.6
> Reporter: poseidon
> Priority: Major
> Labels: bulk-closed
> Attachments: screenshot-1.png
>
>
> just in my way to know how stream , chunk , block works in netty found some nasty case.
> process OpenBlocks message registerStream Stream in OneForOneStreamManager
> org.apache.spark.network.server.OneForOneStreamManager#registerStream
> fill with streamState with app & buber
> process ChunkFetchRequest registerChannel
> org.apache.spark.network.server.OneForOneStreamManager#registerChannel
> fill with streamState with channel
> In
> org.apache.spark.network.shuffle.OneForOneBlockFetcher#start
> OpenBlocks -> ChunkFetchRequest come in sequnce.
> spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java
> @Override
> public void registerChannel(Channel channel, long streamId) {
> if (streams.containsKey(streamId)) {
> streams.get(streamId).associatedChannel = channel;
> }
> }
> this is only chance associatedChannel is set
> public void connectionTerminated(Channel channel) {
> // Close all streams which have been associated with the channel.
> for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
> StreamState state = entry.getValue();
> if (state.associatedChannel == channel) {
> streams.remove(entry.getKey());
> // Release all remaining buffers.
> while (state.buffers.hasNext()) {
> state.buffers.next().release();
> }
> }
> }
> this is only chance state.buffers is released.
> If network down in OpenBlocks process, no more ChunkFetchRequest message then.
> So, channel can not be set.
> So, we can see some leaked Buffer in OneForOneStreamManager
> !screenshot-1.png!
> if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever.
> Which may lead to OOM in NodeManager.
> Because the only way to release it was in channel close , or someone read the last piece of block.
> OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case.
> We should set channel when we registerStream, so buffer can be released.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org