You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ruben Ramalho (JIRA)" <ji...@apache.org> on 2015/08/07 22:58:45 UTC

[jira] [Commented] (SPARK-9476) Kafka stream loses leader after 2h of operation

    [ https://issues.apache.org/jira/browse/SPARK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14662446#comment-14662446 ] 

Ruben Ramalho commented on SPARK-9476:
--------------------------------------

Sorry for the late reply, I promise to keep my response delay much smaller from now on.

There aren't any error logs, but this problem compromises the normal operation of analytics server.

Yes, simpler jobs do run in the same environment. This same setup manages to run correctly for two hours, it's after 2h of operation that this problem arises, which is strange.
Unfortunately I cannot share the relevant code, at least as an integral part, but I can share with you what I am doing. I am consuming data from apache kafka, as positional updates, doing window operations over this data and extracting features. This features are then feed to machine learning algorithms and tips are generated and feed back to kafka (a different topic). If you want specific parts of the code I can provide you with that!

I was using apache kafka 0.8.2.0 with this issue then I updated to 0.8.2.1 (in hopes of this problem being fixed), the issue persists. I think apache spark at some point is corrupting the apache kafka topics, I cannot isolate why that is happening tough. I have used both the kafka direct stream and regular stream and the problem seems to persist.

Thanks you,

R. Ramalho

> Kafka stream loses leader after 2h of operation 
> ------------------------------------------------
>
>                 Key: SPARK-9476
>                 URL: https://issues.apache.org/jira/browse/SPARK-9476
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.4.1
>         Environment: Docker, Centos, Spark standalone, core i7, 8Gb
>            Reporter: Ruben Ramalho
>
> This seems to happen every 2h, it happens both with the direct stream and regular stream, I'm doing window operations over a 1h period (if that can help).
> Here's part of the error message:
> 2015-07-30 13:27:23 WARN  ClientUtils$:89 - Fetching topic metadata with correlation id 10 for topics [Set(updates)] from broker [id:0,host:192.168.3.23,port:3000] failed
> java.nio.channels.ClosedChannelException
> 	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> 	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> 	at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> 	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> 2015-07-30 13:27:23 INFO  SyncProducer:68 - Disconnecting from 192.168.3.23:3000
> 2015-07-30 13:27:23 WARN  ConsumerFetcherManager$LeaderFinderThread:89 - [spark-group_81563e123e9f-1438259236988-fc3d82bf-leader-finder-thread], Failed to find leader for Set([updates,0])
> kafka.common.KafkaException: fetching topic metadata for topics [Set(oversight-updates)] from broker [ArrayBuffer(id:0,host:192.168.3.23,port:3000)] failed
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> 	at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> 	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> Caused by: java.nio.channels.ClosedChannelException
> 	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> 	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> After the crash I tried to communicate with kafka with a simple scala consumer and producer and have no problem at all. Spark tough needs a kafka container restart to start normal operaiton. There are no errors on the kafka log, apart from an improper closed connection.
> I have been trying to solve this problem for days, I suspect this has something to do with spark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org