You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ben Osheroff <be...@zendesk.com.INVALID> on 2016/10/24 21:32:38 UTC

programmatic way to check for topic existence?

Hiya!

I've been trying to merge https://github.com/zendesk/maxwell/pull/457,
which adds a much-requested feature of Maxwell, that of being able to
have a topic-per-mysql-table.  When we receive a row we programmatically
generate the topic name, and the first thing we do is call
`KafkaProducer#partitionsFor(topic)`, so that we know how to partition
the data.

The problem I'm running into is in trying to detect the case where a
topic doesn't exist.  If auto-creation is on, `partitionsFor()` seems to
correctly auto-create the topic, but if auto-creation is off the
behavior is kinda wonky; kafka goes into a metadata-fetch loop, logging

"Error while fetching metadata with correlation id 573 :{topic=UNKNOWN_TOPIC_OR_PARTITION}"

but then ultimately throwing me back a `TimeoutException` after 60 tries
or so.

I can rescue/rethrow the TimeoutException, but it seems like there might
be a better way that I'm missing.  Any ideas?  I'd ideally just like a
way to fail fast and clean when the topic doesn't exist (and
auto-creation is off).

Thanks,
Ben Osheroff
zendesk.com




Re: programmatic way to check for topic existence?

Posted by Ben Osheroff <be...@zendesk.com.INVALID>.
Jayesh,
Thanks, but AFAICT that's just the metadata "shell" -- the Cluster
object just holds information about a cluster, it has no functionality
to actually retrieve it.  I looked briefly around the source and
couldn't find a (public) API that might fill out the Cluster object.
KafkaConsumer has stuff like:

        Cluster cluster = this.metadata.fetch();

But that's all private...

On Tue, Oct 25, 2016 at 02:11:14PM +0000, Thakrar, Jayesh wrote:
> Have a look at the Cluster which has a "topic" method to get a set of all the topics.
>
> https://protect-us.mimecast.com/s/LLEEBwuodaEbCd
>
> In version 8/9, there was also the ZKUtils, but the desire is to have clients not to interrogate ZK directly.
>
> On 10/24/16, 4:32 PM, "Ben Osheroff" <be...@zendesk.com.INVALID> wrote:
>
>     Hiya!
>
>     I've been trying to merge https://protect-us.mimecast.com/s/GWddBYHl3K5DFR,
>     which adds a much-requested feature of Maxwell, that of being able to
>     have a topic-per-mysql-table.  When we receive a row we programmatically
>     generate the topic name, and the first thing we do is call
>     `KafkaProducer#partitionsFor(topic)`, so that we know how to partition
>     the data.
>
>     The problem I'm running into is in trying to detect the case where a
>     topic doesn't exist.  If auto-creation is on, `partitionsFor()` seems to
>     correctly auto-create the topic, but if auto-creation is off the
>     behavior is kinda wonky; kafka goes into a metadata-fetch loop, logging
>
>     "Error while fetching metadata with correlation id 573 :{topic=UNKNOWN_TOPIC_OR_PARTITION}"
>
>     but then ultimately throwing me back a `TimeoutException` after 60 tries
>     or so.
>
>     I can rescue/rethrow the TimeoutException, but it seems like there might
>     be a better way that I'm missing.  Any ideas?  I'd ideally just like a
>     way to fail fast and clean when the topic doesn't exist (and
>     auto-creation is off).
>
>     Thanks,
>     Ben Osheroff
>     zendesk.com
>
>
>
>
>

Re: programmatic way to check for topic existence?

Posted by "Thakrar, Jayesh" <jt...@conversantmedia.com>.
Have a look at the Cluster which has a "topic" method to get a set of all the topics.

https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/Cluster.html

In version 8/9, there was also the ZKUtils, but the desire is to have clients not to interrogate ZK directly.

On 10/24/16, 4:32 PM, "Ben Osheroff" <be...@zendesk.com.INVALID> wrote:

    Hiya!
    
    I've been trying to merge https://github.com/zendesk/maxwell/pull/457,
    which adds a much-requested feature of Maxwell, that of being able to
    have a topic-per-mysql-table.  When we receive a row we programmatically
    generate the topic name, and the first thing we do is call
    `KafkaProducer#partitionsFor(topic)`, so that we know how to partition
    the data.
    
    The problem I'm running into is in trying to detect the case where a
    topic doesn't exist.  If auto-creation is on, `partitionsFor()` seems to
    correctly auto-create the topic, but if auto-creation is off the
    behavior is kinda wonky; kafka goes into a metadata-fetch loop, logging
    
    "Error while fetching metadata with correlation id 573 :{topic=UNKNOWN_TOPIC_OR_PARTITION}"
    
    but then ultimately throwing me back a `TimeoutException` after 60 tries
    or so.
    
    I can rescue/rethrow the TimeoutException, but it seems like there might
    be a better way that I'm missing.  Any ideas?  I'd ideally just like a
    way to fail fast and clean when the topic doesn't exist (and
    auto-creation is off).
    
    Thanks,
    Ben Osheroff
    zendesk.com
    
    
    
    


Re: programmatic way to check for topic existence?

Posted by Ben Osheroff <be...@zendesk.com.INVALID>.
There's a variety of ways (including consumer.listTopics(), or catching
the TimeoutException) that are "kinda OK", I was hoping for something a
bit cleaner.  The problems with the consumer.listTopics() method:

- I have to instantiate a Consumer object in an application that only
  ever produces objects.

- It doesn't play nicely with topic auto-creation; If I find that a
  topic isn't in the list, I either have to:

  a) ask the user if they want to go ahead and try anyway, ie my
  application will have to come with a --kafka-auto-create flag, or:

  b) still take the hit of calling partitionsFor(), maybe waiting for a
  TimeoutException

I guess I'd ask a different question: is the interface the authors
really *want* that partitionsFor() waits forever and then throws a
TimeoutException?

On Tue, Oct 25, 2016 at 11:02:23PM +0530, Kamal C wrote:
> Ben,
>
> You can list all the available topic information and do a simple look up
> from the returned list.
>
> Map<String, List <PartitionInfo>> topics = consumer.listTopics();
> topics.contains () - isn't enough?
>
> -- Kamal
> On 25 Oct 2016 22:56, "Ben Osheroff" <be...@zendesk.com.invalid> wrote:
>
> > We won't proceed in the face of a missing table, we'll just crash.  But
> > it's still a bad experience for us and the end user; we have to guess
> > that maybe a TimeoutException means a missing topic, and we also have to
> > wait the N seconds (default 60) for the thing to "timeout".
> >
> > On Tue, Oct 25, 2016 at 12:14:46AM -0700, Andy Chambers wrote:
> > > You could just catch the exception but if this is per row, that is
> > probably
> > > prohibitively expensive.
> > >
> > > Doesn't the binlog get "create table" events? Wouldn't that be a better
> > > time to create the topic?
> > >
> > > --
> > > Andy
> > >
> > > On Mon, Oct 24, 2016 at 2:32 PM, Ben Osheroff <be...@zendesk.com.invalid>
> > > wrote:
> > >
> > > > Hiya!
> > > >
> > > > I've been trying to merge https://protect-us.mimecast.
> > com/s/ANVVBZU83nznf9,
> > > > which adds a much-requested feature of Maxwell, that of being able to
> > > > have a topic-per-mysql-table.  When we receive a row we
> > programmatically
> > > > generate the topic name, and the first thing we do is call
> > > > `KafkaProducer#partitionsFor(topic)`, so that we know how to partition
> > > > the data.
> > > >
> > > > The problem I'm running into is in trying to detect the case where a
> > > > topic doesn't exist.  If auto-creation is on, `partitionsFor()` seems
> > to
> > > > correctly auto-create the topic, but if auto-creation is off the
> > > > behavior is kinda wonky; kafka goes into a metadata-fetch loop, logging
> > > >
> > > > "Error while fetching metadata with correlation id 573
> > > > :{topic=UNKNOWN_TOPIC_OR_PARTITION}"
> > > >
> > > > but then ultimately throwing me back a `TimeoutException` after 60
> > tries
> > > > or so.
> > > >
> > > > I can rescue/rethrow the TimeoutException, but it seems like there
> > might
> > > > be a better way that I'm missing.  Any ideas?  I'd ideally just like a
> > > > way to fail fast and clean when the topic doesn't exist (and
> > > > auto-creation is off).
> > > >
> > > > Thanks,
> > > > Ben Osheroff
> > > > zendesk.com
> > > >
> > > >
> > > >
> > > >
> >

Re: programmatic way to check for topic existence?

Posted by Kamal C <ka...@gmail.com>.
Ben,

You can list all the available topic information and do a simple look up
from the returned list.

Map<String, List <PartitionInfo>> topics = consumer.listTopics();
topics.contains () - isn't enough?

-- Kamal
On 25 Oct 2016 22:56, "Ben Osheroff" <be...@zendesk.com.invalid> wrote:

> We won't proceed in the face of a missing table, we'll just crash.  But
> it's still a bad experience for us and the end user; we have to guess
> that maybe a TimeoutException means a missing topic, and we also have to
> wait the N seconds (default 60) for the thing to "timeout".
>
> On Tue, Oct 25, 2016 at 12:14:46AM -0700, Andy Chambers wrote:
> > You could just catch the exception but if this is per row, that is
> probably
> > prohibitively expensive.
> >
> > Doesn't the binlog get "create table" events? Wouldn't that be a better
> > time to create the topic?
> >
> > --
> > Andy
> >
> > On Mon, Oct 24, 2016 at 2:32 PM, Ben Osheroff <be...@zendesk.com.invalid>
> > wrote:
> >
> > > Hiya!
> > >
> > > I've been trying to merge https://protect-us.mimecast.
> com/s/ANVVBZU83nznf9,
> > > which adds a much-requested feature of Maxwell, that of being able to
> > > have a topic-per-mysql-table.  When we receive a row we
> programmatically
> > > generate the topic name, and the first thing we do is call
> > > `KafkaProducer#partitionsFor(topic)`, so that we know how to partition
> > > the data.
> > >
> > > The problem I'm running into is in trying to detect the case where a
> > > topic doesn't exist.  If auto-creation is on, `partitionsFor()` seems
> to
> > > correctly auto-create the topic, but if auto-creation is off the
> > > behavior is kinda wonky; kafka goes into a metadata-fetch loop, logging
> > >
> > > "Error while fetching metadata with correlation id 573
> > > :{topic=UNKNOWN_TOPIC_OR_PARTITION}"
> > >
> > > but then ultimately throwing me back a `TimeoutException` after 60
> tries
> > > or so.
> > >
> > > I can rescue/rethrow the TimeoutException, but it seems like there
> might
> > > be a better way that I'm missing.  Any ideas?  I'd ideally just like a
> > > way to fail fast and clean when the topic doesn't exist (and
> > > auto-creation is off).
> > >
> > > Thanks,
> > > Ben Osheroff
> > > zendesk.com
> > >
> > >
> > >
> > >
>

Re: programmatic way to check for topic existence?

Posted by Ben Osheroff <be...@zendesk.com.INVALID>.
We won't proceed in the face of a missing table, we'll just crash.  But
it's still a bad experience for us and the end user; we have to guess
that maybe a TimeoutException means a missing topic, and we also have to
wait the N seconds (default 60) for the thing to "timeout".

On Tue, Oct 25, 2016 at 12:14:46AM -0700, Andy Chambers wrote:
> You could just catch the exception but if this is per row, that is probably
> prohibitively expensive.
>
> Doesn't the binlog get "create table" events? Wouldn't that be a better
> time to create the topic?
>
> --
> Andy
>
> On Mon, Oct 24, 2016 at 2:32 PM, Ben Osheroff <be...@zendesk.com.invalid>
> wrote:
>
> > Hiya!
> >
> > I've been trying to merge https://protect-us.mimecast.com/s/ANVVBZU83nznf9,
> > which adds a much-requested feature of Maxwell, that of being able to
> > have a topic-per-mysql-table.  When we receive a row we programmatically
> > generate the topic name, and the first thing we do is call
> > `KafkaProducer#partitionsFor(topic)`, so that we know how to partition
> > the data.
> >
> > The problem I'm running into is in trying to detect the case where a
> > topic doesn't exist.  If auto-creation is on, `partitionsFor()` seems to
> > correctly auto-create the topic, but if auto-creation is off the
> > behavior is kinda wonky; kafka goes into a metadata-fetch loop, logging
> >
> > "Error while fetching metadata with correlation id 573
> > :{topic=UNKNOWN_TOPIC_OR_PARTITION}"
> >
> > but then ultimately throwing me back a `TimeoutException` after 60 tries
> > or so.
> >
> > I can rescue/rethrow the TimeoutException, but it seems like there might
> > be a better way that I'm missing.  Any ideas?  I'd ideally just like a
> > way to fail fast and clean when the topic doesn't exist (and
> > auto-creation is off).
> >
> > Thanks,
> > Ben Osheroff
> > zendesk.com
> >
> >
> >
> >

Re: programmatic way to check for topic existence?

Posted by Andy Chambers <ac...@gmail.com>.
You could just catch the exception but if this is per row, that is probably
prohibitively expensive.

Doesn't the binlog get "create table" events? Wouldn't that be a better
time to create the topic?

--
Andy

On Mon, Oct 24, 2016 at 2:32 PM, Ben Osheroff <be...@zendesk.com.invalid>
wrote:

> Hiya!
>
> I've been trying to merge https://github.com/zendesk/maxwell/pull/457,
> which adds a much-requested feature of Maxwell, that of being able to
> have a topic-per-mysql-table.  When we receive a row we programmatically
> generate the topic name, and the first thing we do is call
> `KafkaProducer#partitionsFor(topic)`, so that we know how to partition
> the data.
>
> The problem I'm running into is in trying to detect the case where a
> topic doesn't exist.  If auto-creation is on, `partitionsFor()` seems to
> correctly auto-create the topic, but if auto-creation is off the
> behavior is kinda wonky; kafka goes into a metadata-fetch loop, logging
>
> "Error while fetching metadata with correlation id 573
> :{topic=UNKNOWN_TOPIC_OR_PARTITION}"
>
> but then ultimately throwing me back a `TimeoutException` after 60 tries
> or so.
>
> I can rescue/rethrow the TimeoutException, but it seems like there might
> be a better way that I'm missing.  Any ideas?  I'd ideally just like a
> way to fail fast and clean when the topic doesn't exist (and
> auto-creation is off).
>
> Thanks,
> Ben Osheroff
> zendesk.com
>
>
>
>