You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Luis Ángel Vicente Sánchez <la...@gmail.com> on 2014/09/09 23:21:08 UTC

spark.cleaner.ttl and spark.streaming.unpersist

The executors of my spark streaming application are being killed due to
memory issues. The memory consumption is quite high on startup because is
the first run and there are quite a few events on the kafka queues that are
consumed at a rate of 100K events per sec.

I wonder if it's recommended to use spark.cleaner.ttl and
spark.streaming.unpersist together to mitigate that problem. And I also
wonder if new RDD are being batched while a RDD is being processed.

Regards,

Luis

Re: spark.cleaner.ttl and spark.streaming.unpersist

Posted by Tim Smith <se...@gmail.com>.
I switched from Yarn to StandAlone mode and haven't had OOM issue yet.
However, now I have Akka issues killing the executor:

2014-09-11 02:43:34,543 INFO akka.actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.2.16.8%3A44405-6#1549270895]
was not delivered. [2] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Before I switched from Yarn to Standalone, I tried looking at heaps of
running executors. What I found odd was that while both - jmap
histo:live and jmap histo showed heap usage in few hundreds of MBytes,
Yarn kept showing that memory utilization is in several Gigabytes -
eventually leading to the container being killed.

I would appreciate if someone can duplicate what I am seeing. Basically:
1. Tail your yarn container logs and see what it is reporting as
memory used by the JVM
2. In parallel, run "jmap -histo:live <pid>" or "jmap histo <pid>" on
the executor process.

They should be about the same, right?

Also, in the heap dump, 99% of the heap seems to be occupied with
"unreachable objects" (and most of it is byte arrays).




On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith <se...@gmail.com> wrote:
> Actually, I am not doing any explicit shuffle/updateByKey or other
> transform functions. In my program flow, I take in data from Kafka,
> match each message against a list of regex and then if a msg matches a
> regex then extract groups, stuff them in json and push out back to
> kafka (different topic). So there is really no dependency between two
> messages in terms of processing. Here's my container histogram:
> http://pastebin.com/s3nAT3cY
>
> Essentially, my app is a cluster grep on steroids.
>
>
>
> On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska <ya...@gmail.com> wrote:
>> Tim, I asked a similar question twice:
>> here
>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
>> and here
>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html
>>
>> and have not yet received any responses. I noticed that the heapdump only
>> contains a very large byte array consuming about 66%(the second link
>> contains a picture of my heap -- I ran with a small heap to be able to get
>> the failure quickly)
>>
>> I don't have solutions but wanted to affirm that I've observed a similar
>> situation...
>>
>> On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith <se...@gmail.com> wrote:
>>>
>>> I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
>>> the receivers die within an hour because Yarn kills the containers for high
>>> memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
>>> don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
>>> of running receiver processes and in a heap of 30G, roughly ~16G is taken by
>>> "[B" which is byte arrays.
>>>
>>> Still investigating more and would appreciate pointers for
>>> troubleshooting. I have dumped the heap of a receiver and will try to go
>>> over it.
>>>
>>>
>>>
>>>
>>> On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez
>>> <la...@gmail.com> wrote:
>>>>
>>>> I somehow missed that parameter when I was reviewing the documentation,
>>>> that should do the trick! Thank you!
>>>>
>>>> 2014-09-10 2:10 GMT+01:00 Shao, Saisai <sa...@intel.com>:
>>>>
>>>>> Hi Luis,
>>>>>
>>>>>
>>>>>
>>>>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
>>>>> used to remove useless timeout streaming data, the difference is that
>>>>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
>>>>> input data, but also Spark’s useless metadata; while
>>>>> “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming
>>>>> data will be removed when out of slide duration.
>>>>>
>>>>>
>>>>>
>>>>> Both these two parameter can alleviate the memory occupation of Spark
>>>>> Streaming. But if the data is flooded into Spark Streaming when start up
>>>>> like your situation using Kafka, these two parameters cannot well mitigate
>>>>> the problem. Actually you need to control the input data rate to not inject
>>>>> so fast, you can try “spark.straming.receiver.maxRate” to control the inject
>>>>> rate.
>>>>>
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>> Jerry
>>>>>
>>>>>
>>>>>
>>>>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
>>>>> Sent: Wednesday, September 10, 2014 5:21 AM
>>>>> To: user@spark.apache.org
>>>>> Subject: spark.cleaner.ttl and spark.streaming.unpersist
>>>>>
>>>>>
>>>>>
>>>>> The executors of my spark streaming application are being killed due to
>>>>> memory issues. The memory consumption is quite high on startup because is
>>>>> the first run and there are quite a few events on the kafka queues that are
>>>>> consumed at a rate of 100K events per sec.
>>>>>
>>>>> I wonder if it's recommended to use spark.cleaner.ttl and
>>>>> spark.streaming.unpersist together to mitigate that problem. And I also
>>>>> wonder if new RDD are being batched while a RDD is being processed.
>>>>>
>>>>> Regards,
>>>>>
>>>>> Luis
>>>>
>>>>
>>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: spark.cleaner.ttl and spark.streaming.unpersist

Posted by Tim Smith <se...@gmail.com>.
Actually, I am not doing any explicit shuffle/updateByKey or other
transform functions. In my program flow, I take in data from Kafka,
match each message against a list of regex and then if a msg matches a
regex then extract groups, stuff them in json and push out back to
kafka (different topic). So there is really no dependency between two
messages in terms of processing. Here's my container histogram:
http://pastebin.com/s3nAT3cY

Essentially, my app is a cluster grep on steroids.



On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska <ya...@gmail.com> wrote:
> Tim, I asked a similar question twice:
> here
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
> and here
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html
>
> and have not yet received any responses. I noticed that the heapdump only
> contains a very large byte array consuming about 66%(the second link
> contains a picture of my heap -- I ran with a small heap to be able to get
> the failure quickly)
>
> I don't have solutions but wanted to affirm that I've observed a similar
> situation...
>
> On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith <se...@gmail.com> wrote:
>>
>> I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
>> the receivers die within an hour because Yarn kills the containers for high
>> memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
>> don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
>> of running receiver processes and in a heap of 30G, roughly ~16G is taken by
>> "[B" which is byte arrays.
>>
>> Still investigating more and would appreciate pointers for
>> troubleshooting. I have dumped the heap of a receiver and will try to go
>> over it.
>>
>>
>>
>>
>> On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez
>> <la...@gmail.com> wrote:
>>>
>>> I somehow missed that parameter when I was reviewing the documentation,
>>> that should do the trick! Thank you!
>>>
>>> 2014-09-10 2:10 GMT+01:00 Shao, Saisai <sa...@intel.com>:
>>>
>>>> Hi Luis,
>>>>
>>>>
>>>>
>>>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
>>>> used to remove useless timeout streaming data, the difference is that
>>>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
>>>> input data, but also Spark’s useless metadata; while
>>>> “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming
>>>> data will be removed when out of slide duration.
>>>>
>>>>
>>>>
>>>> Both these two parameter can alleviate the memory occupation of Spark
>>>> Streaming. But if the data is flooded into Spark Streaming when start up
>>>> like your situation using Kafka, these two parameters cannot well mitigate
>>>> the problem. Actually you need to control the input data rate to not inject
>>>> so fast, you can try “spark.straming.receiver.maxRate” to control the inject
>>>> rate.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>> From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
>>>> Sent: Wednesday, September 10, 2014 5:21 AM
>>>> To: user@spark.apache.org
>>>> Subject: spark.cleaner.ttl and spark.streaming.unpersist
>>>>
>>>>
>>>>
>>>> The executors of my spark streaming application are being killed due to
>>>> memory issues. The memory consumption is quite high on startup because is
>>>> the first run and there are quite a few events on the kafka queues that are
>>>> consumed at a rate of 100K events per sec.
>>>>
>>>> I wonder if it's recommended to use spark.cleaner.ttl and
>>>> spark.streaming.unpersist together to mitigate that problem. And I also
>>>> wonder if new RDD are being batched while a RDD is being processed.
>>>>
>>>> Regards,
>>>>
>>>> Luis
>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: spark.cleaner.ttl and spark.streaming.unpersist

Posted by Yana Kadiyska <ya...@gmail.com>.
Tim, I asked a similar question twice:
here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
and here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html

and have not yet received any responses. I noticed that the heapdump only
contains a very large byte array consuming about 66%(the second link
contains a picture of my heap -- I ran with a small heap to be able to get
the failure quickly)

I don't have solutions but wanted to affirm that I've observed a similar
situation...

On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith <se...@gmail.com> wrote:

> I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
> the receivers die within an hour because Yarn kills the containers for high
> memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
> don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
> of running receiver processes and in a heap of 30G, roughly ~16G is taken
> by "[B" which is byte arrays.
>
> Still investigating more and would appreciate pointers for
> troubleshooting. I have dumped the heap of a receiver and will try to go
> over it.
>
>
>
>
> On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez <
> langel.groups@gmail.com> wrote:
>
>> I somehow missed that parameter when I was reviewing the documentation,
>> that should do the trick! Thank you!
>>
>> 2014-09-10 2:10 GMT+01:00 Shao, Saisai <sa...@intel.com>:
>>
>>  Hi Luis,
>>>
>>>
>>>
>>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
>>> used to remove useless timeout streaming data, the difference is that
>>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
>>> input data, but also Spark’s useless metadata; while
>>> “spark.streaming.unpersist” is reference-based cleaning mechanism,
>>> streaming data will be removed when out of slide duration.
>>>
>>>
>>>
>>> Both these two parameter can alleviate the memory occupation of Spark
>>> Streaming. But if the data is flooded into Spark Streaming when start up
>>> like your situation using Kafka, these two parameters cannot well mitigate
>>> the problem. Actually you need to control the input data rate to not inject
>>> so fast, you can try “spark.straming.receiver.maxRate” to control the
>>> inject rate.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Jerry
>>>
>>>
>>>
>>> *From:* Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
>>> *Sent:* Wednesday, September 10, 2014 5:21 AM
>>> *To:* user@spark.apache.org
>>> *Subject:* spark.cleaner.ttl and spark.streaming.unpersist
>>>
>>>
>>>
>>> The executors of my spark streaming application are being killed due to
>>> memory issues. The memory consumption is quite high on startup because is
>>> the first run and there are quite a few events on the kafka queues that are
>>> consumed at a rate of 100K events per sec.
>>>
>>> I wonder if it's recommended to use spark.cleaner.ttl and
>>> spark.streaming.unpersist together to mitigate that problem. And I also
>>> wonder if new RDD are being batched while a RDD is being processed.
>>>
>>> Regards,
>>>
>>> Luis
>>>
>>
>>
>

Re: spark.cleaner.ttl and spark.streaming.unpersist

Posted by Tim Smith <se...@gmail.com>.
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
the receivers die within an hour because Yarn kills the containers for high
memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
of running receiver processes and in a heap of 30G, roughly ~16G is taken
by "[B" which is byte arrays.

Still investigating more and would appreciate pointers for troubleshooting.
I have dumped the heap of a receiver and will try to go over it.




On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez <
langel.groups@gmail.com> wrote:

> I somehow missed that parameter when I was reviewing the documentation,
> that should do the trick! Thank you!
>
> 2014-09-10 2:10 GMT+01:00 Shao, Saisai <sa...@intel.com>:
>
>  Hi Luis,
>>
>>
>>
>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
>> used to remove useless timeout streaming data, the difference is that
>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
>> input data, but also Spark’s useless metadata; while
>> “spark.streaming.unpersist” is reference-based cleaning mechanism,
>> streaming data will be removed when out of slide duration.
>>
>>
>>
>> Both these two parameter can alleviate the memory occupation of Spark
>> Streaming. But if the data is flooded into Spark Streaming when start up
>> like your situation using Kafka, these two parameters cannot well mitigate
>> the problem. Actually you need to control the input data rate to not inject
>> so fast, you can try “spark.straming.receiver.maxRate” to control the
>> inject rate.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
>> *Sent:* Wednesday, September 10, 2014 5:21 AM
>> *To:* user@spark.apache.org
>> *Subject:* spark.cleaner.ttl and spark.streaming.unpersist
>>
>>
>>
>> The executors of my spark streaming application are being killed due to
>> memory issues. The memory consumption is quite high on startup because is
>> the first run and there are quite a few events on the kafka queues that are
>> consumed at a rate of 100K events per sec.
>>
>> I wonder if it's recommended to use spark.cleaner.ttl and
>> spark.streaming.unpersist together to mitigate that problem. And I also
>> wonder if new RDD are being batched while a RDD is being processed.
>>
>> Regards,
>>
>> Luis
>>
>
>

Re: spark.cleaner.ttl and spark.streaming.unpersist

Posted by Luis Ángel Vicente Sánchez <la...@gmail.com>.
I somehow missed that parameter when I was reviewing the documentation,
that should do the trick! Thank you!

2014-09-10 2:10 GMT+01:00 Shao, Saisai <sa...@intel.com>:

>  Hi Luis,
>
>
>
> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
> used to remove useless timeout streaming data, the difference is that
> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
> input data, but also Spark’s useless metadata; while
> “spark.streaming.unpersist” is reference-based cleaning mechanism,
> streaming data will be removed when out of slide duration.
>
>
>
> Both these two parameter can alleviate the memory occupation of Spark
> Streaming. But if the data is flooded into Spark Streaming when start up
> like your situation using Kafka, these two parameters cannot well mitigate
> the problem. Actually you need to control the input data rate to not inject
> so fast, you can try “spark.straming.receiver.maxRate” to control the
> inject rate.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
> *Sent:* Wednesday, September 10, 2014 5:21 AM
> *To:* user@spark.apache.org
> *Subject:* spark.cleaner.ttl and spark.streaming.unpersist
>
>
>
> The executors of my spark streaming application are being killed due to
> memory issues. The memory consumption is quite high on startup because is
> the first run and there are quite a few events on the kafka queues that are
> consumed at a rate of 100K events per sec.
>
> I wonder if it's recommended to use spark.cleaner.ttl and
> spark.streaming.unpersist together to mitigate that problem. And I also
> wonder if new RDD are being batched while a RDD is being processed.
>
> Regards,
>
> Luis
>

RE: spark.cleaner.ttl and spark.streaming.unpersist

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi Luis,

The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration.

Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate.

Thanks
Jerry

From: Luis Ángel Vicente Sánchez [mailto:langel.groups@gmail.com]
Sent: Wednesday, September 10, 2014 5:21 AM
To: user@spark.apache.org
Subject: spark.cleaner.ttl and spark.streaming.unpersist

The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec.

I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed.
Regards,

Luis