You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by asdf zxcv <be...@gmail.com> on 2016/01/29 01:07:43 UTC

Spark Caching Kafka Metadata

Does Spark cache which kafka topics exist? A service incorrectly assumes
all the relevant topics exist, even if they are empty, causing it to fail.
Fortunately the service is automatically restarted and by default, kafka
creates the topic after it is requested.

I'm trying to create the topic if it doesn't exist using
AdminUtils.createTopic:

      val zkClient = new ZkClient("localhost:2181", 10000, 10000,
ZKStringSerializer)
      while (!AdminUtils.topicExists(zkClient, topic)) {
        AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties())
      }

But I still get an Error getting partition metadata for 'topic-name'. Does
the topic exist? when I execute KafkaUtils.createDirectStream

I've also tried to implement a retry with a wait such that the retry should
occur after Kafka has created the requested topic with
auto.create.topics.enable
= true, but this still doesn't work consistently.

This is a bit frustrating to debug as well since the topic is successfully
created about 50% of the time, other times I get message "Does the topic
exist?". My thinking is that Spark may be caching the list of extant kafka
topics, ignoring that I've added a new one. Is this the case? Am I missing
something?


Ben

Re: Spark Caching Kafka Metadata

Posted by Benjamin Han <be...@gmail.com>.
Is there another way to create topics from Spark? Is there any reason the
above code snippet would still produce this error? I've dumbly inserted
waits and retries for testing, but that still doesn't consistently work,
even after waiting several minutes.

On Fri, Jan 29, 2016 at 8:29 AM, Cody Koeninger <co...@koeninger.org> wrote:

> The kafka direct stream doesn't do any explicit caching.  I haven't looked
> through the underlying simple consumer code in the kafka project in detail,
> but I doubt it does either.
>
> Honestly, I'd recommend not using auto created topics (it makes it too
> easy to pollute your topics if someone fat-fingers something when
> interacting with kafka), and instead explicitly creating topics before
> using them.
>
> If you're trying to create the topic in your spark job right before using
> it with direct stream, I can see how there might possibly be a race
> condition - you're using the ZK api, but the direct stream is talking only
> to the broker api.
>
> On Thu, Jan 28, 2016 at 6:07 PM, asdf zxcv <be...@gmail.com>
> wrote:
>
>> Does Spark cache which kafka topics exist? A service incorrectly assumes
>> all the relevant topics exist, even if they are empty, causing it to fail.
>> Fortunately the service is automatically restarted and by default, kafka
>> creates the topic after it is requested.
>>
>> I'm trying to create the topic if it doesn't exist using
>> AdminUtils.createTopic:
>>
>>       val zkClient = new ZkClient("localhost:2181", 10000, 10000,
>> ZKStringSerializer)
>>       while (!AdminUtils.topicExists(zkClient, topic)) {
>>         AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties())
>>       }
>>
>> But I still get an Error getting partition metadata for 'topic-name'.
>> Does the topic exist? when I execute KafkaUtils.createDirectStream
>>
>> I've also tried to implement a retry with a wait such that the retry
>> should occur after Kafka has created the requested topic with auto.create.topics.enable
>> = true, but this still doesn't work consistently.
>>
>> This is a bit frustrating to debug as well since the topic is
>> successfully created about 50% of the time, other times I get message "Does
>> the topic exist?". My thinking is that Spark may be caching the list of
>> extant kafka topics, ignoring that I've added a new one. Is this the case?
>> Am I missing something?
>>
>>
>> Ben
>>
>
>

Re: Spark Caching Kafka Metadata

Posted by Cody Koeninger <co...@koeninger.org>.
The kafka direct stream doesn't do any explicit caching.  I haven't looked
through the underlying simple consumer code in the kafka project in detail,
but I doubt it does either.

Honestly, I'd recommend not using auto created topics (it makes it too easy
to pollute your topics if someone fat-fingers something when interacting
with kafka), and instead explicitly creating topics before using them.

If you're trying to create the topic in your spark job right before using
it with direct stream, I can see how there might possibly be a race
condition - you're using the ZK api, but the direct stream is talking only
to the broker api.

On Thu, Jan 28, 2016 at 6:07 PM, asdf zxcv <be...@gmail.com> wrote:

> Does Spark cache which kafka topics exist? A service incorrectly assumes
> all the relevant topics exist, even if they are empty, causing it to fail.
> Fortunately the service is automatically restarted and by default, kafka
> creates the topic after it is requested.
>
> I'm trying to create the topic if it doesn't exist using
> AdminUtils.createTopic:
>
>       val zkClient = new ZkClient("localhost:2181", 10000, 10000,
> ZKStringSerializer)
>       while (!AdminUtils.topicExists(zkClient, topic)) {
>         AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties())
>       }
>
> But I still get an Error getting partition metadata for 'topic-name'.
> Does the topic exist? when I execute KafkaUtils.createDirectStream
>
> I've also tried to implement a retry with a wait such that the retry
> should occur after Kafka has created the requested topic with auto.create.topics.enable
> = true, but this still doesn't work consistently.
>
> This is a bit frustrating to debug as well since the topic is successfully
> created about 50% of the time, other times I get message "Does the topic
> exist?". My thinking is that Spark may be caching the list of extant kafka
> topics, ignoring that I've added a new one. Is this the case? Am I missing
> something?
>
>
> Ben
>