You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Alberto Rodriguez <ar...@gmail.com> on 2015/03/19 20:10:18 UTC

Exception using the new createDirectStream util method

Hi all,

I am trying to make the new kafka and spark streaming integration work (direct
approach "no receivers"
<http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>). I
have created an unit test where I configure and start both zookeeper and
kafka.

When I try to create the InputDStream using the createDirectStream method
of the KafkaUtils class I am getting the following error:

org.apache.spark.SparkException:* Couldn't find leader offsets for Set()*
org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't
find leader offsets for Set()
at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)

Following is the code that tries to create the DStream:

val messages: InputDStream[(String, String)] =
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
        ssc, kafkaParams, topics)

Does anyone faced this problem?

Thank you in advance.

Kind regards,

Alberto

Re: Exception using the new createDirectStream util method

Posted by Cody Koeninger <co...@koeninger.org>.
I went ahead and created

https://issues.apache.org/jira/browse/SPARK-6434

to track this

On Fri, Mar 20, 2015 at 2:44 AM, Alberto Rodriguez <ar...@gmail.com>
wrote:

> You were absolutely right Cody!! I have just put a message in the kafka
> topic before creating the DirectStream and now is working fine!
>
> Do you think that I should open an issue to warn that the kafka topic must
> contain at least one message before the DirectStream creation?
>
> Thank you very much! You've just made my day ;)
>
> 2015-03-19 23:08 GMT+01:00 Cody Koeninger <co...@koeninger.org>:
>
> > Yeah, I wouldn't be shocked if Kafka's metadata apis didn't return
> results
> > for topics that don't have any messages.  (sorry about the triple
> negative,
> > but I think you get my meaning).
> >
> > Try putting a message in the topic and seeing what happens.
> >
> > On Thu, Mar 19, 2015 at 4:38 PM, Alberto Rodriguez <ar...@gmail.com>
> > wrote:
> >
> >> Thank you for replying,
> >>
> >> Ted, I have been debuging and the getLeaderOffsets method is not
> appending
> >> errors because the method findLeaders that is called at the first line
> of
> >> getLeaderOffsets is not returning leaders.
> >>
> >> Cody, the topics do not have any messages yet. Could this be an issue??
> >>
> >> If you guys want to have a look at the code I've just uploaded it to my
> >> github account: big-brother <https://github.com/ardlema/big-brother>
> (see
> >>
> >> DirectKafkaWordCountTest.scala).
> >>
> >> Thank you again!!
> >>
> >> 2015-03-19 22:13 GMT+01:00 Cody Koeninger <co...@koeninger.org>:
> >>
> >> > What is the value of your topics variable, and does it correspond to
> >> > topics that already exist on the cluster and have messages in them?
> >> >
> >> > On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu <yu...@gmail.com> wrote:
> >> >
> >> >> Looking at KafkaCluster#getLeaderOffsets():
> >> >>
> >> >>           respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
> >> >>             if (por.error == ErrorMapping.NoError) {
> >> >> ...
> >> >>             } else {
> >> >>               errs.append(ErrorMapping.exceptionFor(por.error))
> >> >>             }
> >> >> There should be some error other than "Couldn't find leader offsets
> for
> >> >> Set()"
> >> >>
> >> >> Can you check again ?
> >> >>
> >> >> Thanks
> >> >>
> >> >> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez <
> ardlema@gmail.com
> >> >
> >> >> wrote:
> >> >>
> >> >> > Hi all,
> >> >> >
> >> >> > I am trying to make the new kafka and spark streaming integration
> >> work
> >> >> > (direct
> >> >> > approach "no receivers"
> >> >> > <
> http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
> >> >).
> >> >> I
> >> >> > have created an unit test where I configure and start both
> zookeeper
> >> and
> >> >> > kafka.
> >> >> >
> >> >> > When I try to create the InputDStream using the createDirectStream
> >> >> method
> >> >> > of the KafkaUtils class I am getting the following error:
> >> >> >
> >> >> > org.apache.spark.SparkException:* Couldn't find leader offsets for
> >> >> Set()*
> >> >> > org.apache.spark.SparkException: org.apache.spark.SparkException:
> >> >> Couldn't
> >> >> > find leader offsets for Set()
> >> >> > at
> >> >> >
> >> >> >
> >> >>
> >>
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
> >> >> >
> >> >> > Following is the code that tries to create the DStream:
> >> >> >
> >> >> > val messages: InputDStream[(String, String)] =
> >> >> > KafkaUtils.createDirectStream[String, String, StringDecoder,
> >> >> > StringDecoder](
> >> >> >         ssc, kafkaParams, topics)
> >> >> >
> >> >> > Does anyone faced this problem?
> >> >> >
> >> >> > Thank you in advance.
> >> >> >
> >> >> > Kind regards,
> >> >> >
> >> >> > Alberto
> >> >> >
> >> >>
> >> >
> >> >
> >>
> >
> >
>

Re: Exception using the new createDirectStream util method

Posted by Alberto Rodriguez <ar...@gmail.com>.
You were absolutely right Cody!! I have just put a message in the kafka
topic before creating the DirectStream and now is working fine!

Do you think that I should open an issue to warn that the kafka topic must
contain at least one message before the DirectStream creation?

Thank you very much! You've just made my day ;)

2015-03-19 23:08 GMT+01:00 Cody Koeninger <co...@koeninger.org>:

> Yeah, I wouldn't be shocked if Kafka's metadata apis didn't return results
> for topics that don't have any messages.  (sorry about the triple negative,
> but I think you get my meaning).
>
> Try putting a message in the topic and seeing what happens.
>
> On Thu, Mar 19, 2015 at 4:38 PM, Alberto Rodriguez <ar...@gmail.com>
> wrote:
>
>> Thank you for replying,
>>
>> Ted, I have been debuging and the getLeaderOffsets method is not appending
>> errors because the method findLeaders that is called at the first line of
>> getLeaderOffsets is not returning leaders.
>>
>> Cody, the topics do not have any messages yet. Could this be an issue??
>>
>> If you guys want to have a look at the code I've just uploaded it to my
>> github account: big-brother <https://github.com/ardlema/big-brother> (see
>>
>> DirectKafkaWordCountTest.scala).
>>
>> Thank you again!!
>>
>> 2015-03-19 22:13 GMT+01:00 Cody Koeninger <co...@koeninger.org>:
>>
>> > What is the value of your topics variable, and does it correspond to
>> > topics that already exist on the cluster and have messages in them?
>> >
>> > On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu <yu...@gmail.com> wrote:
>> >
>> >> Looking at KafkaCluster#getLeaderOffsets():
>> >>
>> >>           respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
>> >>             if (por.error == ErrorMapping.NoError) {
>> >> ...
>> >>             } else {
>> >>               errs.append(ErrorMapping.exceptionFor(por.error))
>> >>             }
>> >> There should be some error other than "Couldn't find leader offsets for
>> >> Set()"
>> >>
>> >> Can you check again ?
>> >>
>> >> Thanks
>> >>
>> >> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez <ardlema@gmail.com
>> >
>> >> wrote:
>> >>
>> >> > Hi all,
>> >> >
>> >> > I am trying to make the new kafka and spark streaming integration
>> work
>> >> > (direct
>> >> > approach "no receivers"
>> >> > <http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>> >).
>> >> I
>> >> > have created an unit test where I configure and start both zookeeper
>> and
>> >> > kafka.
>> >> >
>> >> > When I try to create the InputDStream using the createDirectStream
>> >> method
>> >> > of the KafkaUtils class I am getting the following error:
>> >> >
>> >> > org.apache.spark.SparkException:* Couldn't find leader offsets for
>> >> Set()*
>> >> > org.apache.spark.SparkException: org.apache.spark.SparkException:
>> >> Couldn't
>> >> > find leader offsets for Set()
>> >> > at
>> >> >
>> >> >
>> >>
>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
>> >> >
>> >> > Following is the code that tries to create the DStream:
>> >> >
>> >> > val messages: InputDStream[(String, String)] =
>> >> > KafkaUtils.createDirectStream[String, String, StringDecoder,
>> >> > StringDecoder](
>> >> >         ssc, kafkaParams, topics)
>> >> >
>> >> > Does anyone faced this problem?
>> >> >
>> >> > Thank you in advance.
>> >> >
>> >> > Kind regards,
>> >> >
>> >> > Alberto
>> >> >
>> >>
>> >
>> >
>>
>
>

Re: Exception using the new createDirectStream util method

Posted by Cody Koeninger <co...@koeninger.org>.
Yeah, I wouldn't be shocked if Kafka's metadata apis didn't return results
for topics that don't have any messages.  (sorry about the triple negative,
but I think you get my meaning).

Try putting a message in the topic and seeing what happens.

On Thu, Mar 19, 2015 at 4:38 PM, Alberto Rodriguez <ar...@gmail.com>
wrote:

> Thank you for replying,
>
> Ted, I have been debuging and the getLeaderOffsets method is not appending
> errors because the method findLeaders that is called at the first line of
> getLeaderOffsets is not returning leaders.
>
> Cody, the topics do not have any messages yet. Could this be an issue??
>
> If you guys want to have a look at the code I've just uploaded it to my
> github account: big-brother <https://github.com/ardlema/big-brother> (see
> DirectKafkaWordCountTest.scala).
>
> Thank you again!!
>
> 2015-03-19 22:13 GMT+01:00 Cody Koeninger <co...@koeninger.org>:
>
> > What is the value of your topics variable, and does it correspond to
> > topics that already exist on the cluster and have messages in them?
> >
> > On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu <yu...@gmail.com> wrote:
> >
> >> Looking at KafkaCluster#getLeaderOffsets():
> >>
> >>           respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
> >>             if (por.error == ErrorMapping.NoError) {
> >> ...
> >>             } else {
> >>               errs.append(ErrorMapping.exceptionFor(por.error))
> >>             }
> >> There should be some error other than "Couldn't find leader offsets for
> >> Set()"
> >>
> >> Can you check again ?
> >>
> >> Thanks
> >>
> >> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez <ar...@gmail.com>
> >> wrote:
> >>
> >> > Hi all,
> >> >
> >> > I am trying to make the new kafka and spark streaming integration work
> >> > (direct
> >> > approach "no receivers"
> >> > <http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
> >).
> >> I
> >> > have created an unit test where I configure and start both zookeeper
> and
> >> > kafka.
> >> >
> >> > When I try to create the InputDStream using the createDirectStream
> >> method
> >> > of the KafkaUtils class I am getting the following error:
> >> >
> >> > org.apache.spark.SparkException:* Couldn't find leader offsets for
> >> Set()*
> >> > org.apache.spark.SparkException: org.apache.spark.SparkException:
> >> Couldn't
> >> > find leader offsets for Set()
> >> > at
> >> >
> >> >
> >>
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
> >> >
> >> > Following is the code that tries to create the DStream:
> >> >
> >> > val messages: InputDStream[(String, String)] =
> >> > KafkaUtils.createDirectStream[String, String, StringDecoder,
> >> > StringDecoder](
> >> >         ssc, kafkaParams, topics)
> >> >
> >> > Does anyone faced this problem?
> >> >
> >> > Thank you in advance.
> >> >
> >> > Kind regards,
> >> >
> >> > Alberto
> >> >
> >>
> >
> >
>

Re: Exception using the new createDirectStream util method

Posted by Alberto Rodriguez <ar...@gmail.com>.
Thank you for replying,

Ted, I have been debuging and the getLeaderOffsets method is not appending
errors because the method findLeaders that is called at the first line of
getLeaderOffsets is not returning leaders.

Cody, the topics do not have any messages yet. Could this be an issue??

If you guys want to have a look at the code I've just uploaded it to my
github account: big-brother <https://github.com/ardlema/big-brother> (see
DirectKafkaWordCountTest.scala).

Thank you again!!

2015-03-19 22:13 GMT+01:00 Cody Koeninger <co...@koeninger.org>:

> What is the value of your topics variable, and does it correspond to
> topics that already exist on the cluster and have messages in them?
>
> On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Looking at KafkaCluster#getLeaderOffsets():
>>
>>           respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
>>             if (por.error == ErrorMapping.NoError) {
>> ...
>>             } else {
>>               errs.append(ErrorMapping.exceptionFor(por.error))
>>             }
>> There should be some error other than "Couldn't find leader offsets for
>> Set()"
>>
>> Can you check again ?
>>
>> Thanks
>>
>> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez <ar...@gmail.com>
>> wrote:
>>
>> > Hi all,
>> >
>> > I am trying to make the new kafka and spark streaming integration work
>> > (direct
>> > approach "no receivers"
>> > <http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>).
>> I
>> > have created an unit test where I configure and start both zookeeper and
>> > kafka.
>> >
>> > When I try to create the InputDStream using the createDirectStream
>> method
>> > of the KafkaUtils class I am getting the following error:
>> >
>> > org.apache.spark.SparkException:* Couldn't find leader offsets for
>> Set()*
>> > org.apache.spark.SparkException: org.apache.spark.SparkException:
>> Couldn't
>> > find leader offsets for Set()
>> > at
>> >
>> >
>> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
>> >
>> > Following is the code that tries to create the DStream:
>> >
>> > val messages: InputDStream[(String, String)] =
>> > KafkaUtils.createDirectStream[String, String, StringDecoder,
>> > StringDecoder](
>> >         ssc, kafkaParams, topics)
>> >
>> > Does anyone faced this problem?
>> >
>> > Thank you in advance.
>> >
>> > Kind regards,
>> >
>> > Alberto
>> >
>>
>
>

Re: Exception using the new createDirectStream util method

Posted by Cody Koeninger <co...@koeninger.org>.
What is the value of your topics variable, and does it correspond to topics
that already exist on the cluster and have messages in them?

On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu <yu...@gmail.com> wrote:

> Looking at KafkaCluster#getLeaderOffsets():
>
>           respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
>             if (por.error == ErrorMapping.NoError) {
> ...
>             } else {
>               errs.append(ErrorMapping.exceptionFor(por.error))
>             }
> There should be some error other than "Couldn't find leader offsets for
> Set()"
>
> Can you check again ?
>
> Thanks
>
> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez <ar...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I am trying to make the new kafka and spark streaming integration work
> > (direct
> > approach "no receivers"
> > <http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>).
> I
> > have created an unit test where I configure and start both zookeeper and
> > kafka.
> >
> > When I try to create the InputDStream using the createDirectStream method
> > of the KafkaUtils class I am getting the following error:
> >
> > org.apache.spark.SparkException:* Couldn't find leader offsets for Set()*
> > org.apache.spark.SparkException: org.apache.spark.SparkException:
> Couldn't
> > find leader offsets for Set()
> > at
> >
> >
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
> >
> > Following is the code that tries to create the DStream:
> >
> > val messages: InputDStream[(String, String)] =
> > KafkaUtils.createDirectStream[String, String, StringDecoder,
> > StringDecoder](
> >         ssc, kafkaParams, topics)
> >
> > Does anyone faced this problem?
> >
> > Thank you in advance.
> >
> > Kind regards,
> >
> > Alberto
> >
>

Re: Exception using the new createDirectStream util method

Posted by Ted Yu <yu...@gmail.com>.
Looking at KafkaCluster#getLeaderOffsets():

          respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
            if (por.error == ErrorMapping.NoError) {
...
            } else {
              errs.append(ErrorMapping.exceptionFor(por.error))
            }
There should be some error other than "Couldn't find leader offsets for
Set()"

Can you check again ?

Thanks

On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez <ar...@gmail.com>
wrote:

> Hi all,
>
> I am trying to make the new kafka and spark streaming integration work
> (direct
> approach "no receivers"
> <http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>). I
> have created an unit test where I configure and start both zookeeper and
> kafka.
>
> When I try to create the InputDStream using the createDirectStream method
> of the KafkaUtils class I am getting the following error:
>
> org.apache.spark.SparkException:* Couldn't find leader offsets for Set()*
> org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't
> find leader offsets for Set()
> at
>
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
>
> Following is the code that tries to create the DStream:
>
> val messages: InputDStream[(String, String)] =
> KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](
>         ssc, kafkaParams, topics)
>
> Does anyone faced this problem?
>
> Thank you in advance.
>
> Kind regards,
>
> Alberto
>