You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Charan Ganga Phani Adabala <ch...@eiqnetworks.com> on 2015/11/20 14:12:38 UTC

Couldn't find leaders for Set([TOPICNNAME,0])) When we are uisng in Apache Saprk.

Hi All,

We are using the Apache Spark 1.5.1 and kafka_2.10-0.8.2.1 and Kafka DirectStream API to fetch data from Kafka using Spark.

Kafka topic properties: Replication Factor :1 and Partitions : 1
Kafka cluster size: 3 Nodes

When all Kafka nodes are up & running, I could successfully get the data for all the topics.

But when one of the Kafka node is down , we are getting below exceptions and though the Node is up after some time, still we are not succeeded, and the spark job is terminated. and unable fetch the data from remaining topics in Kafka.

ERROR DirectKafkaInputDStream:125 - ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0]))
ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms
org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0]))
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Thanks in advance please help how to resolve the issue.



Thanks & Regards,
Ganga Phani Charan Adabala | Software Engineer
o:  +91-40-23116680 | c:  +91-9491418099
EiQ Networks, Inc.<http://www.eiqnetworks.com/>


[cid:image001.png@01D11C9D.AF5CC1F0]<http://www.eiqnetworks.com/>
"This email is intended only for the use of the individual or entity named above and may contain information that is confidential and privileged. If you are not the intended recipient, you are hereby notified that any dissemination, distribution or copying of the email is strictly prohibited. If you have received this email in error, please destroy the original message."

Re: Couldn't find leaders for Set([TOPICNNAME,0])) When we are uisng in Apache Saprk.

Posted by Cody Koeninger <co...@koeninger.org>.
Also, if you actually want to use kafka, you're much better off with a
replication factor greater than 1, so you get leader re-election.

On Fri, Nov 20, 2015 at 9:20 AM, Cody Koeninger <co...@koeninger.org> wrote:

> Spark specific questions are better directed to the Spark user list.
>
> Spark will retry failed tasks automatically up to a configurable number of
> times.  The direct stream will retry failures on the driver up to a
> configurable number of times.
>
> See
>
> http://spark.apache.org/docs/latest/configuration.html
>
> The properties you're looking for are
>
> spark.task.maxFailures
> spark.streaming.kafka.maxRetries
>
> respectively
>
> On Fri, Nov 20, 2015 at 7:12 AM, Charan Ganga Phani Adabala <
> charana@eiqnetworks.com> wrote:
>
>> Hi All,
>>
>> We are using the Apache Spark 1.5.1 and kafka_2.10-0.8.2.1 and Kafka
>> DirectStream API to fetch data from Kafka using Spark.
>>
>> Kafka topic properties: Replication Factor :1 and Partitions : 1
>> Kafka cluster size: 3 Nodes
>>
>>
>> When all Kafka nodes are up & running, I could successfully get the data
>> for all the topics.
>>
>>
>>
>> But when one of the Kafka node is down , we are getting below exceptions
>> and though the Node is up after some time, still we are not succeeded, and
>> the spark job is terminated. and unable fetch the data from remaining
>> topics in Kafka.
>>
>> ERROR DirectKafkaInputDStream:125 -
>> ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for
>> Set([normalized-tenant4,0]))
>> ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms
>> org.apache.spark.SparkException:
>> ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for
>> Set([normalized-tenant4,0]))
>>         at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>>         at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>         at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>>         at scala.Option.orElse(Option.scala:257)
>>         at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>         at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>         at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>         at
>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>         at
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
>>         at scala.util.Try$.apply(Try.scala:161)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
>>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>>
>> Thanks in advance please help how to resolve the issue.
>>
>>
>>
>>
>>
>>
>>
>> Thanks & Regards,
>>
>> *Ganga Phani Charan Adabala | Software Engineer*
>>
>> o:  +91-40-23116680 | c:  +91-9491418099
>>
>> EiQ Networks, Inc. <http://www.eiqnetworks.com/>
>>
>>
>>
>>
>>
>> [image: cid:image001.png@01D11C9D.AF5CC1F0] <http://www.eiqnetworks.com/>
>>
>> *"This email is intended only for the use of the individual or entity
>> named above and may contain information that is confidential and
>> privileged. If you are not the intended recipient, you are hereby notified
>> that any dissemination, distribution or copying of the email is strictly
>> prohibited. If you have received this email in error, please destroy
>> the original message."*
>>
>
>

Re: Couldn't find leaders for Set([TOPICNNAME,0])) When we are uisng in Apache Saprk.

Posted by Cody Koeninger <co...@koeninger.org>.
Spark specific questions are better directed to the Spark user list.

Spark will retry failed tasks automatically up to a configurable number of
times.  The direct stream will retry failures on the driver up to a
configurable number of times.

See

http://spark.apache.org/docs/latest/configuration.html

The properties you're looking for are

spark.task.maxFailures
spark.streaming.kafka.maxRetries

respectively

On Fri, Nov 20, 2015 at 7:12 AM, Charan Ganga Phani Adabala <
charana@eiqnetworks.com> wrote:

> Hi All,
>
> We are using the Apache Spark 1.5.1 and kafka_2.10-0.8.2.1 and Kafka
> DirectStream API to fetch data from Kafka using Spark.
>
> Kafka topic properties: Replication Factor :1 and Partitions : 1
> Kafka cluster size: 3 Nodes
>
>
> When all Kafka nodes are up & running, I could successfully get the data
> for all the topics.
>
>
>
> But when one of the Kafka node is down , we are getting below exceptions
> and though the Node is up after some time, still we are not succeeded, and
> the spark job is terminated. and unable fetch the data from remaining
> topics in Kafka.
>
> ERROR DirectKafkaInputDStream:125 -
> ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for
> Set([normalized-tenant4,0]))
> ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms
> org.apache.spark.SparkException:
> ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for
> Set([normalized-tenant4,0]))
>         at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>         at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>         at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>         at scala.Option.orElse(Option.scala:257)
>         at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>         at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>         at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
>         at scala.util.Try$.apply(Try.scala:161)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
>         at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
> Thanks in advance please help how to resolve the issue.
>
>
>
>
>
>
>
> Thanks & Regards,
>
> *Ganga Phani Charan Adabala | Software Engineer*
>
> o:  +91-40-23116680 | c:  +91-9491418099
>
> EiQ Networks, Inc. <http://www.eiqnetworks.com/>
>
>
>
>
>
> [image: cid:image001.png@01D11C9D.AF5CC1F0] <http://www.eiqnetworks.com/>
>
> *"This email is intended only for the use of the individual or entity
> named above and may contain information that is confidential and
> privileged. If you are not the intended recipient, you are hereby notified
> that any dissemination, distribution or copying of the email is strictly
> prohibited. If you have received this email in error, please destroy
> the original message."*
>