You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Manish Malhotra <ma...@gmail.com> on 2017/05/20 05:54:21 UTC

Spark Streaming: Custom Receiver OOM consistently

Hello,

have implemented Java based custom receiver, which consumes from messaging
system say JMS.
once received message, I call store(object) ... Im storing spark Row object.

it run for around 8 hrs, and then goes OOM, and OOM is happening in receiver
 nodes.
I also tried to run multiple receivers, to distribute the load but faces
the same issue.

something fundamentally we are doing wrong, which tells custom receiver/spark
to release the memory.
but Im not able to crack that, atleast till now.

any help is appreciated !!

Regards,
Manish

Re: Spark Streaming: Custom Receiver OOM consistently

Posted by Manish Malhotra <ma...@gmail.com>.
Thanks !

On Mon, May 22, 2017 at 5:58 PM kant kodali <ka...@gmail.com> wrote:

> Well there are few things here.
>
> 1. What is the Spark Version?
>
cdh 1.6

2. You said there is OOM error but what is the cause that appears in the
> log message or stack trace? OOM can happen for various reasons and JVM
> usually specifies the cause in the error message.
>
GC heap reached. Will send some logs as well.

>
> 3. What is the driver and executor memory?
>
Driver : 4g
Executor: 40g

> 4. What is the receive throughput per second and what is the data size of
> an average message?
>
Msg size : 2KB
10000/sec per receiver. Running 2 receivers.

5. What OS you are using ?
>

Red hat Linux.

StorageLevel.MEMORY_AND_DISK_SER_2 This means that after the receiver
> receives the data is replicated across worker nodes.
>
yes but after batch is finished or after few batches receiver and worker
nodes should discard the old data ?


>
>
>
> On Mon, May 22, 2017 at 5:20 PM, Manish Malhotra <
> manish.malhotra.work@gmail.com> wrote:
>
>> thanks Alonso,
>>
>> Sorry, but there are some security reservations.
>>
>> But we can assume the receiver, is equivalent to writing a JMS based
>> custom receiver, where we register a message listener and for each message
>> delivered by JMS will be stored by calling store method of listener.
>>
>>
>> Something like :
>> https://github.com/tbfenet/spark-jms-receiver/blob/master/src/main/scala/org/apache/spark/streaming/jms/JmsReceiver.scala
>>
>> Though the diff is here this JMS receiver is using block generator and
>> the calling store.
>> I m calling store when I receive message.
>> And also I'm not using block generator.
>> Not sure if that something will make the memory to balloon up.
>>
>> Btw I also run the same message consumer code from standalone map and
>> never seen this memory issue.
>>
>> On Sun, May 21, 2017 at 10:20 AM, Alonso Isidoro Roman <
>> alonsoir@gmail.com> wrote:
>>
>>> could you share the code?
>>>
>>> Alonso Isidoro Roman
>>> [image: https://]about.me/alonso.isidoro.roman
>>>
>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>>
>>> 2017-05-20 7:54 GMT+02:00 Manish Malhotra <
>>> manish.malhotra.work@gmail.com>:
>>>
>>>> Hello,
>>>>
>>>> have implemented Java based custom receiver, which consumes from
>>>> messaging system say JMS.
>>>> once received message, I call store(object) ... Im storing spark Row
>>>> object.
>>>>
>>>> it run for around 8 hrs, and then goes OOM, and OOM is happening in
>>>> receiver nodes.
>>>> I also tried to run multiple receivers, to distribute the load but
>>>> faces the same issue.
>>>>
>>>> something fundamentally we are doing wrong, which tells custom receiver/spark
>>>> to release the memory.
>>>> but Im not able to crack that, atleast till now.
>>>>
>>>> any help is appreciated !!
>>>>
>>>> Regards,
>>>> Manish
>>>>
>>>>
>>>
>>
>

Re: Spark Streaming: Custom Receiver OOM consistently

Posted by kant kodali <ka...@gmail.com>.
Well there are few things here.

1. What is the Spark Version?
2. You said there is OOM error but what is the cause that appears in the
log message or stack trace? OOM can happen for various reasons and JVM
usually specifies the cause in the error message.
3. What is the driver and executor memory?
4. What is the receive throughput per second and what is the data size of
an average message?
5. What OS you are using ?

StorageLevel.MEMORY_AND_DISK_SER_2 This means that after the receiver
receives the data is replicated across worker nodes.




On Mon, May 22, 2017 at 5:20 PM, Manish Malhotra <
manish.malhotra.work@gmail.com> wrote:

> thanks Alonso,
>
> Sorry, but there are some security reservations.
>
> But we can assume the receiver, is equivalent to writing a JMS based
> custom receiver, where we register a message listener and for each message
> delivered by JMS will be stored by calling store method of listener.
>
>
> Something like : https://github.com/tbfenet/spark-jms-receiver/blob/
> master/src/main/scala/org/apache/spark/streaming/jms/JmsReceiver.scala
>
> Though the diff is here this JMS receiver is using block generator and the
> calling store.
> I m calling store when I receive message.
> And also I'm not using block generator.
> Not sure if that something will make the memory to balloon up.
>
> Btw I also run the same message consumer code from standalone map and
> never seen this memory issue.
>
> On Sun, May 21, 2017 at 10:20 AM, Alonso Isidoro Roman <alonsoir@gmail.com
> > wrote:
>
>> could you share the code?
>>
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>
>> 2017-05-20 7:54 GMT+02:00 Manish Malhotra <manish.malhotra.work@gmail.com
>> >:
>>
>>> Hello,
>>>
>>> have implemented Java based custom receiver, which consumes from
>>> messaging system say JMS.
>>> once received message, I call store(object) ... Im storing spark Row
>>> object.
>>>
>>> it run for around 8 hrs, and then goes OOM, and OOM is happening in
>>> receiver nodes.
>>> I also tried to run multiple receivers, to distribute the load but
>>> faces the same issue.
>>>
>>> something fundamentally we are doing wrong, which tells custom receiver/spark
>>> to release the memory.
>>> but Im not able to crack that, atleast till now.
>>>
>>> any help is appreciated !!
>>>
>>> Regards,
>>> Manish
>>>
>>>
>>
>

Re: Spark Streaming: Custom Receiver OOM consistently

Posted by Manish Malhotra <ma...@gmail.com>.
thanks Alonso,

Sorry, but there are some security reservations.

But we can assume the receiver, is equivalent to writing a JMS based custom
receiver, where we register a message listener and for each message
delivered by JMS will be stored by calling store method of listener.


Something like :
https://github.com/tbfenet/spark-jms-receiver/blob/master/src/main/scala/org/apache/spark/streaming/jms/JmsReceiver.scala

Though the diff is here this JMS receiver is using block generator and the
calling store.
I m calling store when I receive message.
And also I'm not using block generator.
Not sure if that something will make the memory to balloon up.

Btw I also run the same message consumer code from standalone map and never
seen this memory issue.

On Sun, May 21, 2017 at 10:20 AM, Alonso Isidoro Roman <al...@gmail.com>
wrote:

> could you share the code?
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>
> 2017-05-20 7:54 GMT+02:00 Manish Malhotra <ma...@gmail.com>
> :
>
>> Hello,
>>
>> have implemented Java based custom receiver, which consumes from
>> messaging system say JMS.
>> once received message, I call store(object) ... Im storing spark Row
>> object.
>>
>> it run for around 8 hrs, and then goes OOM, and OOM is happening in
>> receiver nodes.
>> I also tried to run multiple receivers, to distribute the load but faces
>> the same issue.
>>
>> something fundamentally we are doing wrong, which tells custom receiver/spark
>> to release the memory.
>> but Im not able to crack that, atleast till now.
>>
>> any help is appreciated !!
>>
>> Regards,
>> Manish
>>
>>
>

Re: Spark Streaming: Custom Receiver OOM consistently

Posted by Alonso Isidoro Roman <al...@gmail.com>.
could you share the code?

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>

2017-05-20 7:54 GMT+02:00 Manish Malhotra <ma...@gmail.com>:

> Hello,
>
> have implemented Java based custom receiver, which consumes from
> messaging system say JMS.
> once received message, I call store(object) ... Im storing spark Row
> object.
>
> it run for around 8 hrs, and then goes OOM, and OOM is happening in
> receiver nodes.
> I also tried to run multiple receivers, to distribute the load but faces
> the same issue.
>
> something fundamentally we are doing wrong, which tells custom receiver/spark
> to release the memory.
> but Im not able to crack that, atleast till now.
>
> any help is appreciated !!
>
> Regards,
> Manish
>
>