You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Benjamin Han <be...@gmail.com> on 2016/02/01 18:09:45 UTC

Re: Spark Caching Kafka Metadata

Is there another way to create topics from Spark? Is there any reason the
above code snippet would still produce this error? I've dumbly inserted
waits and retries for testing, but that still doesn't consistently work,
even after waiting several minutes.

On Fri, Jan 29, 2016 at 8:29 AM, Cody Koeninger <co...@koeninger.org> wrote:

> The kafka direct stream doesn't do any explicit caching.  I haven't looked
> through the underlying simple consumer code in the kafka project in detail,
> but I doubt it does either.
>
> Honestly, I'd recommend not using auto created topics (it makes it too
> easy to pollute your topics if someone fat-fingers something when
> interacting with kafka), and instead explicitly creating topics before
> using them.
>
> If you're trying to create the topic in your spark job right before using
> it with direct stream, I can see how there might possibly be a race
> condition - you're using the ZK api, but the direct stream is talking only
> to the broker api.
>
> On Thu, Jan 28, 2016 at 6:07 PM, asdf zxcv <be...@gmail.com>
> wrote:
>
>> Does Spark cache which kafka topics exist? A service incorrectly assumes
>> all the relevant topics exist, even if they are empty, causing it to fail.
>> Fortunately the service is automatically restarted and by default, kafka
>> creates the topic after it is requested.
>>
>> I'm trying to create the topic if it doesn't exist using
>> AdminUtils.createTopic:
>>
>>       val zkClient = new ZkClient("localhost:2181", 10000, 10000,
>> ZKStringSerializer)
>>       while (!AdminUtils.topicExists(zkClient, topic)) {
>>         AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties())
>>       }
>>
>> But I still get an Error getting partition metadata for 'topic-name'.
>> Does the topic exist? when I execute KafkaUtils.createDirectStream
>>
>> I've also tried to implement a retry with a wait such that the retry
>> should occur after Kafka has created the requested topic with auto.create.topics.enable
>> = true, but this still doesn't work consistently.
>>
>> This is a bit frustrating to debug as well since the topic is
>> successfully created about 50% of the time, other times I get message "Does
>> the topic exist?". My thinking is that Spark may be caching the list of
>> extant kafka topics, ignoring that I've added a new one. Is this the case?
>> Am I missing something?
>>
>>
>> Ben
>>
>
>