You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shushant Arora <sh...@gmail.com> on 2015/08/21 11:06:08 UTC

spark streaming 1.3 kafka error

Hi


Getting below error in spark streaming 1.3 while consuming from kafka
using directkafka stream. Few of tasks are getting failed in each run.


What is the reason /solution of this error?


15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in
stage 130.0 (TID 16332)
java.io.EOFException: Received -1 when reading from channel, socket
has likely been closed.
	at kafka.utils.Utils$.read(Utils.scala:376)
	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:64)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 16348
15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage
130.0 (TID 16348)
15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic
test_hbrealtimeevents, partition 75 offsets 4701 -> 4718
15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties




Thanks

Re: spark streaming 1.3 kafka error

Posted by Cody Koeninger <co...@koeninger.org>.
To be perfectly clear, the direct kafka stream will also recover from any
failures, because it does the simplest thing possible - fail the task and
let spark retry it.

If you're consistently having socket closed problems on one task after
another, there's probably something else going on in your environment.
Shushant, none of your responses have indicated whether you've tried any of
the system level troubleshooting suggestions that have been made by various
people.

Also, if you have 300 partitions, and only 10mb of data, that is completely
unnecessary.  You're probably going to have lots of empty partitions, which
will have a negative effect on your runtime.

On Sat, Aug 22, 2015 at 11:28 AM, Dibyendu Bhattacharya <
dibyendu.bhattachary@gmail.com> wrote:

> I think you also can give a try to this consumer :
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer in your
> environment. This has been running fine for topic with large number of
> Kafka partition ( > 200 ) like yours without any issue.. no issue with
> connection as this consumer re-use kafka connection , and also can recover
> from any failures ( network loss , Kafka leader goes down, ZK down etc ..).
>
>
> Regards,
> Dibyendu
>
> On Sat, Aug 22, 2015 at 7:35 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> On trying the consumer without external connections  or with low number
>> of external conections its working fine -
>>
>> so doubt is how  socket got closed -
>>
>> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>>
>>
>>
>> On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Can you try some other consumer and see if the issue still exists?
>>> On Aug 22, 2015 12:47 AM, "Shushant Arora" <sh...@gmail.com>
>>> wrote:
>>>
>>>> Exception comes when client has so many connections to some another
>>>> external server also.
>>>> So I think Exception is coming because of client side issue only-
>>>> server side there is no issue.
>>>>
>>>>
>>>> Want to understand is executor(simple consumer) not making new
>>>> connection to kafka broker at start of each task ? Or is it created once
>>>> only and that is getting closed somehow ?
>>>>
>>>> On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> it comes at start of each tasks when there is new data inserted in
>>>>> kafka.( data inserted is very few)
>>>>> kafka topic has 300 partitions - data inserted is ~10 MB.
>>>>>
>>>>> Tasks gets failed and it retries which succeed and after certain no of
>>>>> fail tasks it kills the job.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das <akhil@sigmoidanalytics.com
>>>>> > wrote:
>>>>>
>>>>>> That looks like you are choking your kafka machine. Do a top on the
>>>>>> kafka machines and see the workload, it may happen that you are spending
>>>>>> too much time on disk io etc.
>>>>>> On Aug 21, 2015 7:32 AM, "Cody Koeninger" <co...@koeninger.org> wrote:
>>>>>>
>>>>>>> Sounds like that's happening consistently, not an occasional network
>>>>>>> problem?
>>>>>>>
>>>>>>> Look at the Kafka broker logs
>>>>>>>
>>>>>>> Make sure you've configured the correct kafka broker hosts / ports
>>>>>>> (note that direct stream does not use zookeeper host / port).
>>>>>>>
>>>>>>> Make sure that host / port is reachable from your driver and worker
>>>>>>> nodes, ie telnet or netcat to it.  It looks like your driver can reach it
>>>>>>> (since there's partition info in the logs), but that doesn't mean the
>>>>>>> worker can.
>>>>>>>
>>>>>>> Use lsof / netstat to see what's going on with those ports while the
>>>>>>> job is running, or tcpdump if you need to.
>>>>>>>
>>>>>>> If you can't figure out what's going on from a networking point of
>>>>>>> view, post a minimal reproducible code sample that demonstrates the issue,
>>>>>>> so it can be tested in a different environment.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <
>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>>
>>>>>>>> Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run.
>>>>>>>>
>>>>>>>>
>>>>>>>> What is the reason /solution of this error?
>>>>>>>>
>>>>>>>>
>>>>>>>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332)
>>>>>>>> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>>>>>>>> 	at kafka.utils.Utils$.read(Utils.scala:376)
>>>>>>>> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>>>>>> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>>>>> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>>>>> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>>>>>>> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>>>>>>>> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>>>> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
>>>>>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>>> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348
>>>>>>>> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348)
>>>>>>>> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 -> 4718
>>>>>>>> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>
>

Re: spark streaming 1.3 kafka error

Posted by Dibyendu Bhattacharya <di...@gmail.com>.
I think you also can give a try to this consumer :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer in your
environment. This has been running fine for topic with large number of
Kafka partition ( > 200 ) like yours without any issue.. no issue with
connection as this consumer re-use kafka connection , and also can recover
from any failures ( network loss , Kafka leader goes down, ZK down etc ..).


Regards,
Dibyendu

On Sat, Aug 22, 2015 at 7:35 PM, Shushant Arora <sh...@gmail.com>
wrote:

> On trying the consumer without external connections  or with low number of
> external conections its working fine -
>
> so doubt is how  socket got closed -
>
> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>
>
>
> On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Can you try some other consumer and see if the issue still exists?
>> On Aug 22, 2015 12:47 AM, "Shushant Arora" <sh...@gmail.com>
>> wrote:
>>
>>> Exception comes when client has so many connections to some another
>>> external server also.
>>> So I think Exception is coming because of client side issue only- server
>>> side there is no issue.
>>>
>>>
>>> Want to understand is executor(simple consumer) not making new
>>> connection to kafka broker at start of each task ? Or is it created once
>>> only and that is getting closed somehow ?
>>>
>>> On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> it comes at start of each tasks when there is new data inserted in
>>>> kafka.( data inserted is very few)
>>>> kafka topic has 300 partitions - data inserted is ~10 MB.
>>>>
>>>> Tasks gets failed and it retries which succeed and after certain no of
>>>> fail tasks it kills the job.
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> That looks like you are choking your kafka machine. Do a top on the
>>>>> kafka machines and see the workload, it may happen that you are spending
>>>>> too much time on disk io etc.
>>>>> On Aug 21, 2015 7:32 AM, "Cody Koeninger" <co...@koeninger.org> wrote:
>>>>>
>>>>>> Sounds like that's happening consistently, not an occasional network
>>>>>> problem?
>>>>>>
>>>>>> Look at the Kafka broker logs
>>>>>>
>>>>>> Make sure you've configured the correct kafka broker hosts / ports
>>>>>> (note that direct stream does not use zookeeper host / port).
>>>>>>
>>>>>> Make sure that host / port is reachable from your driver and worker
>>>>>> nodes, ie telnet or netcat to it.  It looks like your driver can reach it
>>>>>> (since there's partition info in the logs), but that doesn't mean the
>>>>>> worker can.
>>>>>>
>>>>>> Use lsof / netstat to see what's going on with those ports while the
>>>>>> job is running, or tcpdump if you need to.
>>>>>>
>>>>>> If you can't figure out what's going on from a networking point of
>>>>>> view, post a minimal reproducible code sample that demonstrates the issue,
>>>>>> so it can be tested in a different environment.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <
>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>>
>>>>>>> Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run.
>>>>>>>
>>>>>>>
>>>>>>> What is the reason /solution of this error?
>>>>>>>
>>>>>>>
>>>>>>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332)
>>>>>>> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>>>>>>> 	at kafka.utils.Utils$.read(Utils.scala:376)
>>>>>>> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>>>>> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>>>> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>>>> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>>>>>> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>>>>>>> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>>> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
>>>>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348
>>>>>>> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348)
>>>>>>> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 -> 4718
>>>>>>> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>

Re: spark streaming 1.3 kafka error

Posted by Shushant Arora <sh...@gmail.com>.
On trying the consumer without external connections  or with low
number of external conections its working fine -

so doubt is how  socket got closed -


15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in
stage 130.0 (TID 16332)
java.io.EOFException: Received -1 when reading from channel, socket
has likely been closed.
	at kafka.utils.Utils$.read(Utils.scala:376)


is executor(simple consumer) not making new connection to kafka broker at
start of each task ? Or is it created once only and that is getting closed
somehow ?

On Sat, Aug 22, 2015 at 7:35 PM, Shushant Arora <sh...@gmail.com>
wrote:

> On trying the consumer without external connections  or with low number of
> external conections its working fine -
>
> so doubt is how  socket got closed -
>
> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>
>
>
> On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Can you try some other consumer and see if the issue still exists?
>> On Aug 22, 2015 12:47 AM, "Shushant Arora" <sh...@gmail.com>
>> wrote:
>>
>>> Exception comes when client has so many connections to some another
>>> external server also.
>>> So I think Exception is coming because of client side issue only- server
>>> side there is no issue.
>>>
>>>
>>> Want to understand is executor(simple consumer) not making new
>>> connection to kafka broker at start of each task ? Or is it created once
>>> only and that is getting closed somehow ?
>>>
>>> On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> it comes at start of each tasks when there is new data inserted in
>>>> kafka.( data inserted is very few)
>>>> kafka topic has 300 partitions - data inserted is ~10 MB.
>>>>
>>>> Tasks gets failed and it retries which succeed and after certain no of
>>>> fail tasks it kills the job.
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> That looks like you are choking your kafka machine. Do a top on the
>>>>> kafka machines and see the workload, it may happen that you are spending
>>>>> too much time on disk io etc.
>>>>> On Aug 21, 2015 7:32 AM, "Cody Koeninger" <co...@koeninger.org> wrote:
>>>>>
>>>>>> Sounds like that's happening consistently, not an occasional network
>>>>>> problem?
>>>>>>
>>>>>> Look at the Kafka broker logs
>>>>>>
>>>>>> Make sure you've configured the correct kafka broker hosts / ports
>>>>>> (note that direct stream does not use zookeeper host / port).
>>>>>>
>>>>>> Make sure that host / port is reachable from your driver and worker
>>>>>> nodes, ie telnet or netcat to it.  It looks like your driver can reach it
>>>>>> (since there's partition info in the logs), but that doesn't mean the
>>>>>> worker can.
>>>>>>
>>>>>> Use lsof / netstat to see what's going on with those ports while the
>>>>>> job is running, or tcpdump if you need to.
>>>>>>
>>>>>> If you can't figure out what's going on from a networking point of
>>>>>> view, post a minimal reproducible code sample that demonstrates the issue,
>>>>>> so it can be tested in a different environment.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <
>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>>
>>>>>>> Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run.
>>>>>>>
>>>>>>>
>>>>>>> What is the reason /solution of this error?
>>>>>>>
>>>>>>>
>>>>>>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332)
>>>>>>> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>>>>>>> 	at kafka.utils.Utils$.read(Utils.scala:376)
>>>>>>> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>>>>> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>>>> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>>>> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>>>>>> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>>>>>>> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>>> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
>>>>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348
>>>>>>> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348)
>>>>>>> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 -> 4718
>>>>>>> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>

Re: spark streaming 1.3 kafka error

Posted by Shushant Arora <sh...@gmail.com>.
On trying the consumer without external connections  or with low number of
external conections its working fine -

so doubt is how  socket got closed -

java.io.EOFException: Received -1 when reading from channel, socket
has likely been closed.



On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Can you try some other consumer and see if the issue still exists?
> On Aug 22, 2015 12:47 AM, "Shushant Arora" <sh...@gmail.com>
> wrote:
>
>> Exception comes when client has so many connections to some another
>> external server also.
>> So I think Exception is coming because of client side issue only- server
>> side there is no issue.
>>
>>
>> Want to understand is executor(simple consumer) not making new connection
>> to kafka broker at start of each task ? Or is it created once only and that
>> is getting closed somehow ?
>>
>> On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> it comes at start of each tasks when there is new data inserted in
>>> kafka.( data inserted is very few)
>>> kafka topic has 300 partitions - data inserted is ~10 MB.
>>>
>>> Tasks gets failed and it retries which succeed and after certain no of
>>> fail tasks it kills the job.
>>>
>>>
>>>
>>>
>>> On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> That looks like you are choking your kafka machine. Do a top on the
>>>> kafka machines and see the workload, it may happen that you are spending
>>>> too much time on disk io etc.
>>>> On Aug 21, 2015 7:32 AM, "Cody Koeninger" <co...@koeninger.org> wrote:
>>>>
>>>>> Sounds like that's happening consistently, not an occasional network
>>>>> problem?
>>>>>
>>>>> Look at the Kafka broker logs
>>>>>
>>>>> Make sure you've configured the correct kafka broker hosts / ports
>>>>> (note that direct stream does not use zookeeper host / port).
>>>>>
>>>>> Make sure that host / port is reachable from your driver and worker
>>>>> nodes, ie telnet or netcat to it.  It looks like your driver can reach it
>>>>> (since there's partition info in the logs), but that doesn't mean the
>>>>> worker can.
>>>>>
>>>>> Use lsof / netstat to see what's going on with those ports while the
>>>>> job is running, or tcpdump if you need to.
>>>>>
>>>>> If you can't figure out what's going on from a networking point of
>>>>> view, post a minimal reproducible code sample that demonstrates the issue,
>>>>> so it can be tested in a different environment.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <
>>>>> shushantarora09@gmail.com> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>>
>>>>>> Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run.
>>>>>>
>>>>>>
>>>>>> What is the reason /solution of this error?
>>>>>>
>>>>>>
>>>>>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332)
>>>>>> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>>>>>> 	at kafka.utils.Utils$.read(Utils.scala:376)
>>>>>> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>>>> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>>> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>>> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>>>>> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>>>>>> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
>>>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348
>>>>>> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348)
>>>>>> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 -> 4718
>>>>>> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>
>>>
>>

Re: spark streaming 1.3 kafka error

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Can you try some other consumer and see if the issue still exists?
On Aug 22, 2015 12:47 AM, "Shushant Arora" <sh...@gmail.com>
wrote:

> Exception comes when client has so many connections to some another
> external server also.
> So I think Exception is coming because of client side issue only- server
> side there is no issue.
>
>
> Want to understand is executor(simple consumer) not making new connection
> to kafka broker at start of each task ? Or is it created once only and that
> is getting closed somehow ?
>
> On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> it comes at start of each tasks when there is new data inserted in
>> kafka.( data inserted is very few)
>> kafka topic has 300 partitions - data inserted is ~10 MB.
>>
>> Tasks gets failed and it retries which succeed and after certain no of
>> fail tasks it kills the job.
>>
>>
>>
>>
>> On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> That looks like you are choking your kafka machine. Do a top on the
>>> kafka machines and see the workload, it may happen that you are spending
>>> too much time on disk io etc.
>>> On Aug 21, 2015 7:32 AM, "Cody Koeninger" <co...@koeninger.org> wrote:
>>>
>>>> Sounds like that's happening consistently, not an occasional network
>>>> problem?
>>>>
>>>> Look at the Kafka broker logs
>>>>
>>>> Make sure you've configured the correct kafka broker hosts / ports
>>>> (note that direct stream does not use zookeeper host / port).
>>>>
>>>> Make sure that host / port is reachable from your driver and worker
>>>> nodes, ie telnet or netcat to it.  It looks like your driver can reach it
>>>> (since there's partition info in the logs), but that doesn't mean the
>>>> worker can.
>>>>
>>>> Use lsof / netstat to see what's going on with those ports while the
>>>> job is running, or tcpdump if you need to.
>>>>
>>>> If you can't figure out what's going on from a networking point of
>>>> view, post a minimal reproducible code sample that demonstrates the issue,
>>>> so it can be tested in a different environment.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <
>>>> shushantarora09@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>>
>>>>> Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run.
>>>>>
>>>>>
>>>>> What is the reason /solution of this error?
>>>>>
>>>>>
>>>>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332)
>>>>> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>>>>> 	at kafka.utils.Utils$.read(Utils.scala:376)
>>>>> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>>> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>>>> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>>>>> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
>>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348
>>>>> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348)
>>>>> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 -> 4718
>>>>> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>
>>
>

Re: spark streaming 1.3 kafka error

Posted by Shushant Arora <sh...@gmail.com>.
Exception comes when client has so many connections to some another
external server also.
So I think Exception is coming because of client side issue only- server
side there is no issue.


Want to understand is executor(simple consumer) not making new connection
to kafka broker at start of each task ? Or is it created once only and that
is getting closed somehow ?

On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora <sh...@gmail.com>
wrote:

> it comes at start of each tasks when there is new data inserted in kafka.(
> data inserted is very few)
> kafka topic has 300 partitions - data inserted is ~10 MB.
>
> Tasks gets failed and it retries which succeed and after certain no of
> fail tasks it kills the job.
>
>
>
>
> On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> That looks like you are choking your kafka machine. Do a top on the kafka
>> machines and see the workload, it may happen that you are spending too much
>> time on disk io etc.
>> On Aug 21, 2015 7:32 AM, "Cody Koeninger" <co...@koeninger.org> wrote:
>>
>>> Sounds like that's happening consistently, not an occasional network
>>> problem?
>>>
>>> Look at the Kafka broker logs
>>>
>>> Make sure you've configured the correct kafka broker hosts / ports (note
>>> that direct stream does not use zookeeper host / port).
>>>
>>> Make sure that host / port is reachable from your driver and worker
>>> nodes, ie telnet or netcat to it.  It looks like your driver can reach it
>>> (since there's partition info in the logs), but that doesn't mean the
>>> worker can.
>>>
>>> Use lsof / netstat to see what's going on with those ports while the job
>>> is running, or tcpdump if you need to.
>>>
>>> If you can't figure out what's going on from a networking point of view,
>>> post a minimal reproducible code sample that demonstrates the issue, so it
>>> can be tested in a different environment.
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>>
>>>> Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run.
>>>>
>>>>
>>>> What is the reason /solution of this error?
>>>>
>>>>
>>>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332)
>>>> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>>>> 	at kafka.utils.Utils$.read(Utils.scala:376)
>>>> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>>> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>>>> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348
>>>> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348)
>>>> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 -> 4718
>>>> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties
>>>>
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>
>

Re: spark streaming 1.3 kafka error

Posted by Shushant Arora <sh...@gmail.com>.
it comes at start of each tasks when there is new data inserted in kafka.(
data inserted is very few)
kafka topic has 300 partitions - data inserted is ~10 MB.

Tasks gets failed and it retries which succeed and after certain no of fail
tasks it kills the job.




On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> That looks like you are choking your kafka machine. Do a top on the kafka
> machines and see the workload, it may happen that you are spending too much
> time on disk io etc.
> On Aug 21, 2015 7:32 AM, "Cody Koeninger" <co...@koeninger.org> wrote:
>
>> Sounds like that's happening consistently, not an occasional network
>> problem?
>>
>> Look at the Kafka broker logs
>>
>> Make sure you've configured the correct kafka broker hosts / ports (note
>> that direct stream does not use zookeeper host / port).
>>
>> Make sure that host / port is reachable from your driver and worker
>> nodes, ie telnet or netcat to it.  It looks like your driver can reach it
>> (since there's partition info in the logs), but that doesn't mean the
>> worker can.
>>
>> Use lsof / netstat to see what's going on with those ports while the job
>> is running, or tcpdump if you need to.
>>
>> If you can't figure out what's going on from a networking point of view,
>> post a minimal reproducible code sample that demonstrates the issue, so it
>> can be tested in a different environment.
>>
>>
>>
>>
>>
>> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Hi
>>>
>>>
>>> Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run.
>>>
>>>
>>> What is the reason /solution of this error?
>>>
>>>
>>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332)
>>> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>>> 	at kafka.utils.Utils$.read(Utils.scala:376)
>>> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>>> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> 	at java.lang.Thread.run(Thread.java:745)
>>> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348
>>> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348)
>>> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 -> 4718
>>> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties
>>>
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>

Re: spark streaming 1.3 kafka error

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
That looks like you are choking your kafka machine. Do a top on the kafka
machines and see the workload, it may happen that you are spending too much
time on disk io etc.
On Aug 21, 2015 7:32 AM, "Cody Koeninger" <co...@koeninger.org> wrote:

> Sounds like that's happening consistently, not an occasional network
> problem?
>
> Look at the Kafka broker logs
>
> Make sure you've configured the correct kafka broker hosts / ports (note
> that direct stream does not use zookeeper host / port).
>
> Make sure that host / port is reachable from your driver and worker nodes,
> ie telnet or netcat to it.  It looks like your driver can reach it (since
> there's partition info in the logs), but that doesn't mean the worker can.
>
> Use lsof / netstat to see what's going on with those ports while the job
> is running, or tcpdump if you need to.
>
> If you can't figure out what's going on from a networking point of view,
> post a minimal reproducible code sample that demonstrates the issue, so it
> can be tested in a different environment.
>
>
>
>
>
> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Hi
>>
>>
>> Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run.
>>
>>
>> What is the reason /solution of this error?
>>
>>
>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332)
>> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>> 	at kafka.utils.Utils$.read(Utils.scala:376)
>> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> 	at java.lang.Thread.run(Thread.java:745)
>> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348
>> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348)
>> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 -> 4718
>> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties
>>
>>
>>
>>
>> Thanks
>>
>>
>

Re: spark streaming 1.3 kafka error

Posted by Cody Koeninger <co...@koeninger.org>.
Sounds like that's happening consistently, not an occasional network
problem?

Look at the Kafka broker logs

Make sure you've configured the correct kafka broker hosts / ports (note
that direct stream does not use zookeeper host / port).

Make sure that host / port is reachable from your driver and worker nodes,
ie telnet or netcat to it.  It looks like your driver can reach it (since
there's partition info in the logs), but that doesn't mean the worker can.

Use lsof / netstat to see what's going on with those ports while the job is
running, or tcpdump if you need to.

If you can't figure out what's going on from a networking point of view,
post a minimal reproducible code sample that demonstrates the issue, so it
can be tested in a different environment.





On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <sh...@gmail.com>
wrote:

> Hi
>
>
> Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run.
>
>
> What is the reason /solution of this error?
>
>
> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332)
> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
> 	at kafka.utils.Utils$.read(Utils.scala:376)
> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348
> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348)
> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 -> 4718
> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties
>
>
>
>
> Thanks
>
>