You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cody Koeninger (JIRA)" <ji...@apache.org> on 2015/04/06 16:38:12 UTC

[jira] [Commented] (SPARK-6431) Couldn't find leader offsets exception when creating KafkaDirectStream

    [ https://issues.apache.org/jira/browse/SPARK-6431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14481266#comment-14481266 ] 

Cody Koeninger commented on SPARK-6431:
---------------------------------------

I think this got mis-diagnosed on the mailing list, sorry for the confusion.

The only way I've been able to reproduce that exception is by trying to start a stream for a topic that doesn't exist at all.  Alberto, did you actually run kafka-topics.sh --create before starting the job, or in some other way create the topic?  Pretty sure what happened here is that your topic didn't exist the first time you ran the job.  Your brokers were set to auto-create topics, so it did exist the next time you ran the job.  Putting a message into the topic didn't have anything to do with it.

Here's why I think that's what happened.  Following console session is an example, where "empty" topic existed prior to starting the console, but had no messages.  Topic "hasonemesssage" existed and had one message in it.  Topic "doesntexistyet" didn't exist at the beginning of the console.

The metadata apis return the same info for existing-but-empty topics as they do for topics with messages in them:

scala> kc.getPartitions(Set("empty")).right
res0: scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]] = RightProjection(Right(
Set([empty,0], [empty,1])))

scala> kc.getPartitions(Set("hasonemessage")).right
res1: scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]] = RightProjection(Right(Set([hasonemessage,0], [hasonemessage,1])))


Leader offsets are both 0 for the empty topic, as you'd expect:

scala> kc.getLatestLeaderOffsets(kc.getPartitions(Set("empty")).right.get)
res5: Either[org.apache.spark.streaming.kafka.KafkaCluster.Err,Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset]] = Right(Map([empty,1] -> LeaderOffset(localhost,9094,0), [empty,0] -> LeaderOffset(localhost,9093,0)))

And one of the leader offsets is 1 for the topic with one message:

scala> kc.getLatestLeaderOffsets(kc.getPartitions(Set("hasonemessage")).right.get)
res6: Either[org.apache.spark.streaming.kafka.KafkaCluster.Err,Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset]] = Right(Map([hasonemessage,0] -> LeaderOffset(localhost,9092,1), [hasonemessage,1] -> LeaderOffset(localhost,9093,0)))


The first time a metadata request is made against the non-existing topic, it returns empty:

kc.getPartitions(Set("doesntexistyet")).right
res2: scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]] = RightProjection(Right(Set()))


But if your brokers are configured with auto.create.topics.enable set to true, that metadata request alone is enough to trigger creation of the topic.  Requesting it again shows that the topic has been created:

scala> kc.getPartitions(Set("doesntexistyet")).right
res3: scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]] = RightProjection(Right(Set([doesntexistyet,0], [doesntexistyet,1])))


If you don't think that explains what happened, please let me know if you have a way of reproducing that exception against an existing-but-empty topic, because I cant.

As far as what to do about this, my instinct is to just improve the error handling for the getPartitions call.  If the topic doesn't exist yet, It shouldn't be returning an empty set, it should be returning an error.


> Couldn't find leader offsets exception when creating KafkaDirectStream
> ----------------------------------------------------------------------
>
>                 Key: SPARK-6431
>                 URL: https://issues.apache.org/jira/browse/SPARK-6431
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.0
>            Reporter: Alberto
>
> When I try to create an InputDStream using the createDirectStream method of the KafkaUtils class and the kafka topic does not have any messages yet am getting the following error:
> org.apache.spark.SparkException: Couldn't find leader offsets for Set()
> org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leader offsets for Set()
> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
> If I put a message in the topic before creating the DirectStream everything works fine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org