You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sourabh Chandak <so...@gmail.com> on 2015/09/24 22:25:46 UTC

ERROR BoundedByteBufferReceive: OOME with size 352518400

Hi,

I have ported receiver less spark streaming for kafka to Spark 1.2 and am
trying to run a spark streaming job to consume data form my broker, but I
am getting the following error:

15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size 352518400
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at
kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
        at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
        at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
        at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
        at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
        at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
        at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
        at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
        at org.apache.spark.streaming.kafka.KafkaCluster.org
$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
        at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
        at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
        at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
        at
org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
        at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
        at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



I have tried allocating 100G of memory with 1 executor but it is still
failing.

Spark version: 1.2.2
Kafka version ported: 0.8.2
Kafka server version: trunk version with SSL enabled

Can someone please help me debug this.

Thanks,
Sourabh

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Sourabh Chandak <so...@gmail.com>.
Thanks Cody I was able to find out the issue yesterday after sending the
last email.

On Friday, September 25, 2015, Cody Koeninger <co...@koeninger.org> wrote:

> So you're still having a problem getting partitions or offsets from kafka
> when creating the stream.  You can try each of those kafka operations
> individually (getPartitions / getLatestLeaderOffsets)
>
> checkErrors should be dealing with an arraybuffer of throwables, not just
> a single one.  Is that the only error you're seeing, or are there more?
>
> You can also modify it to call printStackTrace or whatever on each
> individual error, instead of only printing the message.
>
>
>
>
> On Thu, Sep 24, 2015 at 5:00 PM, Sourabh Chandak <sourabh3934@gmail.com
> <javascript:_e(%7B%7D,'cvml','sourabh3934@gmail.com');>> wrote:
>
>> I was able to get pass this issue. I was pointing the SSL port whereas
>> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
>> am getting the following error:
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> java.nio.BufferUnderflowException
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at scala.util.Either.fold(Either.scala:97)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
>>         at
>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> Thanks,
>> Sourabh
>>
>> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <cody@koeninger.org
>> <javascript:_e(%7B%7D,'cvml','cody@koeninger.org');>> wrote:
>>
>>> That looks like the OOM is in the driver, when getting partition
>>> metadata to create the direct stream.  In that case, executor memory
>>> allocation doesn't matter.
>>>
>>> Allocate more driver memory, or put a profiler on it to see what's
>>> taking up heap.
>>>
>>>
>>>
>>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <sourabh3934@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','sourabh3934@gmail.com');>> wrote:
>>>
>>>> Adding Cody and Sriharsha
>>>>
>>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <sourabh3934@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','sourabh3934@gmail.com');>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>>>> am trying to run a spark streaming job to consume data form my broker, but
>>>>> I am getting the following error:
>>>>>
>>>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>>>> 352518400
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>>>>         at
>>>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>>>>         at
>>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>>>>         at
>>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>>         at
>>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>>         at
>>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>>>>         at
>>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>>>>         at
>>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>>>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>>>>         at
>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>         at
>>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>>>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>>>>         at
>>>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>>>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>>>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>         at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>         at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>>
>>>>>
>>>>>
>>>>> I have tried allocating 100G of memory with 1 executor but it is still
>>>>> failing.
>>>>>
>>>>> Spark version: 1.2.2
>>>>> Kafka version ported: 0.8.2
>>>>> Kafka server version: trunk version with SSL enabled
>>>>>
>>>>> Can someone please help me debug this.
>>>>>
>>>>> Thanks,
>>>>> Sourabh
>>>>>
>>>>
>>>>
>>>
>>
>

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Sourabh Chandak <so...@gmail.com>.
Thanks Cody I was able to find out the issue yesterday after sending the
last email.

On Friday, September 25, 2015, Cody Koeninger <co...@koeninger.org> wrote:

> So you're still having a problem getting partitions or offsets from kafka
> when creating the stream.  You can try each of those kafka operations
> individually (getPartitions / getLatestLeaderOffsets)
>
> checkErrors should be dealing with an arraybuffer of throwables, not just
> a single one.  Is that the only error you're seeing, or are there more?
>
> You can also modify it to call printStackTrace or whatever on each
> individual error, instead of only printing the message.
>
>
>
>
> On Thu, Sep 24, 2015 at 5:00 PM, Sourabh Chandak <sourabh3934@gmail.com
> <javascript:_e(%7B%7D,'cvml','sourabh3934@gmail.com');>> wrote:
>
>> I was able to get pass this issue. I was pointing the SSL port whereas
>> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
>> am getting the following error:
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> java.nio.BufferUnderflowException
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at scala.util.Either.fold(Either.scala:97)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
>>         at
>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> Thanks,
>> Sourabh
>>
>> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <cody@koeninger.org
>> <javascript:_e(%7B%7D,'cvml','cody@koeninger.org');>> wrote:
>>
>>> That looks like the OOM is in the driver, when getting partition
>>> metadata to create the direct stream.  In that case, executor memory
>>> allocation doesn't matter.
>>>
>>> Allocate more driver memory, or put a profiler on it to see what's
>>> taking up heap.
>>>
>>>
>>>
>>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <sourabh3934@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','sourabh3934@gmail.com');>> wrote:
>>>
>>>> Adding Cody and Sriharsha
>>>>
>>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <sourabh3934@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','sourabh3934@gmail.com');>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>>>> am trying to run a spark streaming job to consume data form my broker, but
>>>>> I am getting the following error:
>>>>>
>>>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>>>> 352518400
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>>>>         at
>>>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>>>>         at
>>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>>>>         at
>>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>>         at
>>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>>         at
>>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>>>>         at
>>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>>>>         at
>>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>>>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>>>>         at
>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>         at
>>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>>>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>>>>         at
>>>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>>>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>>>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>         at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>         at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>>
>>>>>
>>>>>
>>>>> I have tried allocating 100G of memory with 1 executor but it is still
>>>>> failing.
>>>>>
>>>>> Spark version: 1.2.2
>>>>> Kafka version ported: 0.8.2
>>>>> Kafka server version: trunk version with SSL enabled
>>>>>
>>>>> Can someone please help me debug this.
>>>>>
>>>>> Thanks,
>>>>> Sourabh
>>>>>
>>>>
>>>>
>>>
>>
>

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Cody Koeninger <co...@koeninger.org>.
So you're still having a problem getting partitions or offsets from kafka
when creating the stream.  You can try each of those kafka operations
individually (getPartitions / getLatestLeaderOffsets)

checkErrors should be dealing with an arraybuffer of throwables, not just a
single one.  Is that the only error you're seeing, or are there more?

You can also modify it to call printStackTrace or whatever on each
individual error, instead of only printing the message.




On Thu, Sep 24, 2015 at 5:00 PM, Sourabh Chandak <so...@gmail.com>
wrote:

> I was able to get pass this issue. I was pointing the SSL port whereas
> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
> am getting the following error:
>
> Exception in thread "main" org.apache.spark.SparkException:
> java.nio.BufferUnderflowException
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at scala.util.Either.fold(Either.scala:97)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
>         at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Thanks,
> Sourabh
>
> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> That looks like the OOM is in the driver, when getting partition metadata
>> to create the direct stream.  In that case, executor memory allocation
>> doesn't matter.
>>
>> Allocate more driver memory, or put a profiler on it to see what's taking
>> up heap.
>>
>>
>>
>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <so...@gmail.com>
>> wrote:
>>
>>> Adding Cody and Sriharsha
>>>
>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <so...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>>> am trying to run a spark streaming job to consume data form my broker, but
>>>> I am getting the following error:
>>>>
>>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>>> 352518400
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>>>         at
>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>         at
>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>>>         at
>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>>>         at
>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>>>         at
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>         at
>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>>>         at
>>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>
>>>>
>>>>
>>>> I have tried allocating 100G of memory with 1 executor but it is still
>>>> failing.
>>>>
>>>> Spark version: 1.2.2
>>>> Kafka version ported: 0.8.2
>>>> Kafka server version: trunk version with SSL enabled
>>>>
>>>> Can someone please help me debug this.
>>>>
>>>> Thanks,
>>>> Sourabh
>>>>
>>>
>>>
>>
>

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Sourabh Chandak <so...@gmail.com>.
Here is the code snippet, starting line 365 in KafkaCluster.scala:

type Err = ArrayBuffer[Throwable]

/** If the result is right, return it, otherwise throw SparkException */
def checkErrors[T](result: Either[Err, T]): T = {
  result.fold(
    errs => throw new SparkException(errs.mkString("Throwing this errir\n")),
    ok => ok
  )
}



On Thu, Sep 24, 2015 at 3:00 PM, Sourabh Chandak <so...@gmail.com>
wrote:

> I was able to get pass this issue. I was pointing the SSL port whereas
> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
> am getting the following error:
>
> Exception in thread "main" org.apache.spark.SparkException:
> java.nio.BufferUnderflowException
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at scala.util.Either.fold(Either.scala:97)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
>         at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Thanks,
> Sourabh
>
> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> That looks like the OOM is in the driver, when getting partition metadata
>> to create the direct stream.  In that case, executor memory allocation
>> doesn't matter.
>>
>> Allocate more driver memory, or put a profiler on it to see what's taking
>> up heap.
>>
>>
>>
>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <so...@gmail.com>
>> wrote:
>>
>>> Adding Cody and Sriharsha
>>>
>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <so...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>>> am trying to run a spark streaming job to consume data form my broker, but
>>>> I am getting the following error:
>>>>
>>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>>> 352518400
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>>>         at
>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>         at
>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>>>         at
>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>>>         at
>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>>>         at
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>         at
>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>>>         at
>>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>
>>>>
>>>>
>>>> I have tried allocating 100G of memory with 1 executor but it is still
>>>> failing.
>>>>
>>>> Spark version: 1.2.2
>>>> Kafka version ported: 0.8.2
>>>> Kafka server version: trunk version with SSL enabled
>>>>
>>>> Can someone please help me debug this.
>>>>
>>>> Thanks,
>>>> Sourabh
>>>>
>>>
>>>
>>
>

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Cody Koeninger <co...@koeninger.org>.
So you're still having a problem getting partitions or offsets from kafka
when creating the stream.  You can try each of those kafka operations
individually (getPartitions / getLatestLeaderOffsets)

checkErrors should be dealing with an arraybuffer of throwables, not just a
single one.  Is that the only error you're seeing, or are there more?

You can also modify it to call printStackTrace or whatever on each
individual error, instead of only printing the message.




On Thu, Sep 24, 2015 at 5:00 PM, Sourabh Chandak <so...@gmail.com>
wrote:

> I was able to get pass this issue. I was pointing the SSL port whereas
> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
> am getting the following error:
>
> Exception in thread "main" org.apache.spark.SparkException:
> java.nio.BufferUnderflowException
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at scala.util.Either.fold(Either.scala:97)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
>         at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Thanks,
> Sourabh
>
> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> That looks like the OOM is in the driver, when getting partition metadata
>> to create the direct stream.  In that case, executor memory allocation
>> doesn't matter.
>>
>> Allocate more driver memory, or put a profiler on it to see what's taking
>> up heap.
>>
>>
>>
>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <so...@gmail.com>
>> wrote:
>>
>>> Adding Cody and Sriharsha
>>>
>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <so...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>>> am trying to run a spark streaming job to consume data form my broker, but
>>>> I am getting the following error:
>>>>
>>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>>> 352518400
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>>>         at
>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>         at
>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>>>         at
>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>>>         at
>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>>>         at
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>         at
>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>>>         at
>>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>
>>>>
>>>>
>>>> I have tried allocating 100G of memory with 1 executor but it is still
>>>> failing.
>>>>
>>>> Spark version: 1.2.2
>>>> Kafka version ported: 0.8.2
>>>> Kafka server version: trunk version with SSL enabled
>>>>
>>>> Can someone please help me debug this.
>>>>
>>>> Thanks,
>>>> Sourabh
>>>>
>>>
>>>
>>
>

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Sourabh Chandak <so...@gmail.com>.
Here is the code snippet, starting line 365 in KafkaCluster.scala:

type Err = ArrayBuffer[Throwable]

/** If the result is right, return it, otherwise throw SparkException */
def checkErrors[T](result: Either[Err, T]): T = {
  result.fold(
    errs => throw new SparkException(errs.mkString("Throwing this errir\n")),
    ok => ok
  )
}



On Thu, Sep 24, 2015 at 3:00 PM, Sourabh Chandak <so...@gmail.com>
wrote:

> I was able to get pass this issue. I was pointing the SSL port whereas
> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
> am getting the following error:
>
> Exception in thread "main" org.apache.spark.SparkException:
> java.nio.BufferUnderflowException
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at scala.util.Either.fold(Either.scala:97)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
>         at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Thanks,
> Sourabh
>
> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> That looks like the OOM is in the driver, when getting partition metadata
>> to create the direct stream.  In that case, executor memory allocation
>> doesn't matter.
>>
>> Allocate more driver memory, or put a profiler on it to see what's taking
>> up heap.
>>
>>
>>
>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <so...@gmail.com>
>> wrote:
>>
>>> Adding Cody and Sriharsha
>>>
>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <so...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>>> am trying to run a spark streaming job to consume data form my broker, but
>>>> I am getting the following error:
>>>>
>>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>>> 352518400
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>>>         at
>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>         at
>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>         at
>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>>>         at
>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>>>         at
>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>>>         at
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>         at
>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>>>         at
>>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>
>>>>
>>>>
>>>> I have tried allocating 100G of memory with 1 executor but it is still
>>>> failing.
>>>>
>>>> Spark version: 1.2.2
>>>> Kafka version ported: 0.8.2
>>>> Kafka server version: trunk version with SSL enabled
>>>>
>>>> Can someone please help me debug this.
>>>>
>>>> Thanks,
>>>> Sourabh
>>>>
>>>
>>>
>>
>

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Sourabh Chandak <so...@gmail.com>.
I was able to get pass this issue. I was pointing the SSL port whereas
SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
am getting the following error:

Exception in thread "main" org.apache.spark.SparkException:
java.nio.BufferUnderflowException
        at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
        at
org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
        at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
        at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks,
Sourabh

On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <co...@koeninger.org> wrote:

> That looks like the OOM is in the driver, when getting partition metadata
> to create the direct stream.  In that case, executor memory allocation
> doesn't matter.
>
> Allocate more driver memory, or put a profiler on it to see what's taking
> up heap.
>
>
>
> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <so...@gmail.com>
> wrote:
>
>> Adding Cody and Sriharsha
>>
>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <so...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>> am trying to run a spark streaming job to consume data form my broker, but
>>> I am getting the following error:
>>>
>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>> 352518400
>>> java.lang.OutOfMemoryError: Java heap space
>>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>>         at
>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>>         at
>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>>         at
>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>         at
>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>         at
>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>>         at
>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>>         at
>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>>         at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>         at
>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>>         at
>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>>
>>>
>>> I have tried allocating 100G of memory with 1 executor but it is still
>>> failing.
>>>
>>> Spark version: 1.2.2
>>> Kafka version ported: 0.8.2
>>> Kafka server version: trunk version with SSL enabled
>>>
>>> Can someone please help me debug this.
>>>
>>> Thanks,
>>> Sourabh
>>>
>>
>>
>

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Sourabh Chandak <so...@gmail.com>.
I was able to get pass this issue. I was pointing the SSL port whereas
SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
am getting the following error:

Exception in thread "main" org.apache.spark.SparkException:
java.nio.BufferUnderflowException
        at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
        at
org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
        at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
        at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks,
Sourabh

On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <co...@koeninger.org> wrote:

> That looks like the OOM is in the driver, when getting partition metadata
> to create the direct stream.  In that case, executor memory allocation
> doesn't matter.
>
> Allocate more driver memory, or put a profiler on it to see what's taking
> up heap.
>
>
>
> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <so...@gmail.com>
> wrote:
>
>> Adding Cody and Sriharsha
>>
>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <so...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>> am trying to run a spark streaming job to consume data form my broker, but
>>> I am getting the following error:
>>>
>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>> 352518400
>>> java.lang.OutOfMemoryError: Java heap space
>>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>>         at
>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>>         at
>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>>         at
>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>         at
>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>         at
>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>>         at
>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>>         at
>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>>         at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>         at
>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>>         at
>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>>
>>>
>>> I have tried allocating 100G of memory with 1 executor but it is still
>>> failing.
>>>
>>> Spark version: 1.2.2
>>> Kafka version ported: 0.8.2
>>> Kafka server version: trunk version with SSL enabled
>>>
>>> Can someone please help me debug this.
>>>
>>> Thanks,
>>> Sourabh
>>>
>>
>>
>

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Cody Koeninger <co...@koeninger.org>.
That looks like the OOM is in the driver, when getting partition metadata
to create the direct stream.  In that case, executor memory allocation
doesn't matter.

Allocate more driver memory, or put a profiler on it to see what's taking
up heap.



On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <so...@gmail.com>
wrote:

> Adding Cody and Sriharsha
>
> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <so...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have ported receiver less spark streaming for kafka to Spark 1.2 and am
>> trying to run a spark streaming job to consume data form my broker, but I
>> am getting the following error:
>>
>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size 352518400
>> java.lang.OutOfMemoryError: Java heap space
>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>         at
>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>         at
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>         at
>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>         at
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>         at
>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>         at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>         at
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>         at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>         at
>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>         at
>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> I have tried allocating 100G of memory with 1 executor but it is still
>> failing.
>>
>> Spark version: 1.2.2
>> Kafka version ported: 0.8.2
>> Kafka server version: trunk version with SSL enabled
>>
>> Can someone please help me debug this.
>>
>> Thanks,
>> Sourabh
>>
>
>

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Cody Koeninger <co...@koeninger.org>.
That looks like the OOM is in the driver, when getting partition metadata
to create the direct stream.  In that case, executor memory allocation
doesn't matter.

Allocate more driver memory, or put a profiler on it to see what's taking
up heap.



On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <so...@gmail.com>
wrote:

> Adding Cody and Sriharsha
>
> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <so...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have ported receiver less spark streaming for kafka to Spark 1.2 and am
>> trying to run a spark streaming job to consume data form my broker, but I
>> am getting the following error:
>>
>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size 352518400
>> java.lang.OutOfMemoryError: Java heap space
>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>         at
>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>         at
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>         at
>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>         at
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>         at
>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>         at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>         at
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>         at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>         at
>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>         at
>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> I have tried allocating 100G of memory with 1 executor but it is still
>> failing.
>>
>> Spark version: 1.2.2
>> Kafka version ported: 0.8.2
>> Kafka server version: trunk version with SSL enabled
>>
>> Can someone please help me debug this.
>>
>> Thanks,
>> Sourabh
>>
>
>

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Sourabh Chandak <so...@gmail.com>.
Adding Cody and Sriharsha

On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <so...@gmail.com>
wrote:

> Hi,
>
> I have ported receiver less spark streaming for kafka to Spark 1.2 and am
> trying to run a spark streaming job to consume data form my broker, but I
> am getting the following error:
>
> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size 352518400
> java.lang.OutOfMemoryError: Java heap space
>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>         at
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>         at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>         at org.apache.spark.streaming.kafka.KafkaCluster.org
> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>         at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
> I have tried allocating 100G of memory with 1 executor but it is still
> failing.
>
> Spark version: 1.2.2
> Kafka version ported: 0.8.2
> Kafka server version: trunk version with SSL enabled
>
> Can someone please help me debug this.
>
> Thanks,
> Sourabh
>

Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

Posted by Sourabh Chandak <so...@gmail.com>.
Adding Cody and Sriharsha

On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <so...@gmail.com>
wrote:

> Hi,
>
> I have ported receiver less spark streaming for kafka to Spark 1.2 and am
> trying to run a spark streaming job to consume data form my broker, but I
> am getting the following error:
>
> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size 352518400
> java.lang.OutOfMemoryError: Java heap space
>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>         at
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>         at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>         at org.apache.spark.streaming.kafka.KafkaCluster.org
> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>         at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
> I have tried allocating 100G of memory with 1 executor but it is still
> failing.
>
> Spark version: 1.2.2
> Kafka version ported: 0.8.2
> Kafka server version: trunk version with SSL enabled
>
> Can someone please help me debug this.
>
> Thanks,
> Sourabh
>