You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Murilo Tavares <mu...@gmail.com> on 2019/03/22 20:25:11 UTC

KafkaStreams backoff for non-existing topic

Hi
After some research, I've come to a few discussions, and they all tell me
that Kafka Streams require the topics to be created before starting the
application.
Nevertheless, I'd like my application to keep retrying if a topic does not
exist.
I've seen this thread:
https://groups.google.com/forum/#!topic/confluent-platform/nmfrnAKCM3c,
which is pretty old, and I'd like to know if it's still hard to catch that
Exception in my app.

Thanks
Murilo

Re: KafkaStreams backoff for non-existing topic

Posted by Murilo Tavares <mu...@gmail.com>.
Thank you guys for your response.
That was very helpful. With that in mind, and with some discussions with
the team, we decided to let the application die, so we can monitor and
relaunch it externally.
I think we had something similar to what is described in KAFKA-7970, where
our streams application was wrapped by another application, so the
application would not die. But we are going to move away from that approach
and launch the streams application as a separate process anyway, so that
won't be an issue anymore.
Thanks again.
Murilo

On Mon, 25 Mar 2019 at 22:14, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Patrik,
>
> Are you referring to the source topic not available or state directory not
> available?
>
> For source topic not available case, as Murilo asked about, the main issue
> is of the "split brain" case: in very early versions of Kafka, Streams does
> check if topics are available at the beginning before starting up, but then
> there is an observation that when multiple instances of Streams app are
> starting up at the same time, depending on 1) concurrent edge cases if
> topics are created just before / around the same time that Streams
> instances are started, and 2) which broker they talk to refresh their
> metadata, they may get different answers and hence behave differently, i.e.
> some move on starting normally while some others report an error and
> shutdown.
>
> So what we did in KAFKA-5037 is that, only one instance will make the
> decision, who's the leader of the corresponding consumer group. And if that
> leader decides that not all topics are available, it will propagate this
> decision to shutdown to everyone within the group -- of course, depending
> on how much time you take on starting each of your instances and how they'd
> form a group, some late started instance that missed the previous group
> generation may still see the world differently, but that's fixable with
> KIP-345 coming along as well.
>
> Guozhang
>
>
> On Mon, Mar 25, 2019 at 4:03 PM Patrik Kleindl <pk...@gmail.com> wrote:
>
> > Hi Guozhang
> > Just a small question, why can't this be checked when trying to
> instantiate
> > KafkaStreams?
> > The Topology should know all topics and the existence of the topics could
> > be verified with the AdminClient.
> > This would allow to fail fast similar to when the state directory is not
> > available.
> > Or am I missing something?
> > best regards
> > Patrik
> >
> > On Mon, 25 Mar 2019 at 23:15, Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > Hello Murilo,
> > >
> > > Just to give some more background to John's message and KAFKA-7970
> here.
> > > The main reason of trickiness is around the scenario of "topics being
> > > partially available", e.g. say your application is joining to topics A
> > and
> > > B, while topicA exists but topicB does not (it is surprisingly common
> due
> > > to either human errors, or topic creation race conditions, etc). Then
> you
> > > have a few options at hand:
> > >
> > > 1. Just start the app normally, which will only process data from
> topicA
> > > and none from topicB. When topicB is created later the app will
> > > auto-rebalance to get the data (this is guaranteed by Streams itself).
> > > However before this is true the join operator would see no data from
> > topicB
> > > to join to while proceeding. This behavior is actually the case before
> > > Kafka version 2.0 and many users complained about it.
> > >
> > > 2. Does not start the app at all, notify the users that some topics are
> > > missing and stop. This is what we changed in KAFKA-5037.
> > >
> > > 3. We can also, let app to stay up and running, but does not process
> any
> > > data at all until all topics subscribed become available, etc etc.
> > >
> > >
> > > Now depending on user's motivation cases and preferences, they may
> prefer
> > > any of these options. The reason we chose to do 2) is to give users
> some
> > > control upon the event and do their wrapping logic on top of it (e.g.
> > like
> > > John suggested). Hope this helps some clarifications for you.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Mar 25, 2019 at 12:23 PM John Roesler <jo...@confluent.io>
> wrote:
> > >
> > > > Hi, Murlio,
> > > >
> > > > I found https://issues.apache.org/jira/browse/KAFKA-7970, which
> sounds
> > > > like
> > > > the answer is currently "yes". Unfortunately, it is still tricky to
> > > handle
> > > > this case, although the situation may improve soon.
> > > >
> > > > In the mean time, you can try to work around it with the
> StateListener.
> > > > When Streams has a successful start-up, you'll see it transition from
> > > > REBALANCING to RUNNING, so if you see it transition to
> > PENDING_SHUTDOWN,
> > > > NOT_RUNNING, or ERROR before you see "oldState: REBALANCING &&
> > newState:
> > > > RUNNING", you know that Streams did not have a successful startup. It
> > > > sounds like you can't determine programmatically *why* this happened,
> > but
> > > > you can log a warning or error and then create a new the KafkaStreams
> > > > object and try starting it again.
> > > >
> > > > I hope this helps, and feel free to comment on that ticket to add
> your
> > > own
> > > > perspective to the issue!
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Fri, Mar 22, 2019 at 3:25 PM Murilo Tavares <mu...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi
> > > > > After some research, I've come to a few discussions, and they all
> > tell
> > > me
> > > > > that Kafka Streams require the topics to be created before starting
> > the
> > > > > application.
> > > > > Nevertheless, I'd like my application to keep retrying if a topic
> > does
> > > > not
> > > > > exist.
> > > > > I've seen this thread:
> > > > >
> > https://groups.google.com/forum/#!topic/confluent-platform/nmfrnAKCM3c
> > > ,
> > > > > which is pretty old, and I'd like to know if it's still hard to
> catch
> > > > that
> > > > > Exception in my app.
> > > > >
> > > > > Thanks
> > > > > Murilo
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: KafkaStreams backoff for non-existing topic

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Patrik,

Are you referring to the source topic not available or state directory not
available?

For source topic not available case, as Murilo asked about, the main issue
is of the "split brain" case: in very early versions of Kafka, Streams does
check if topics are available at the beginning before starting up, but then
there is an observation that when multiple instances of Streams app are
starting up at the same time, depending on 1) concurrent edge cases if
topics are created just before / around the same time that Streams
instances are started, and 2) which broker they talk to refresh their
metadata, they may get different answers and hence behave differently, i.e.
some move on starting normally while some others report an error and
shutdown.

So what we did in KAFKA-5037 is that, only one instance will make the
decision, who's the leader of the corresponding consumer group. And if that
leader decides that not all topics are available, it will propagate this
decision to shutdown to everyone within the group -- of course, depending
on how much time you take on starting each of your instances and how they'd
form a group, some late started instance that missed the previous group
generation may still see the world differently, but that's fixable with
KIP-345 coming along as well.

Guozhang


On Mon, Mar 25, 2019 at 4:03 PM Patrik Kleindl <pk...@gmail.com> wrote:

> Hi Guozhang
> Just a small question, why can't this be checked when trying to instantiate
> KafkaStreams?
> The Topology should know all topics and the existence of the topics could
> be verified with the AdminClient.
> This would allow to fail fast similar to when the state directory is not
> available.
> Or am I missing something?
> best regards
> Patrik
>
> On Mon, 25 Mar 2019 at 23:15, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Murilo,
> >
> > Just to give some more background to John's message and KAFKA-7970 here.
> > The main reason of trickiness is around the scenario of "topics being
> > partially available", e.g. say your application is joining to topics A
> and
> > B, while topicA exists but topicB does not (it is surprisingly common due
> > to either human errors, or topic creation race conditions, etc). Then you
> > have a few options at hand:
> >
> > 1. Just start the app normally, which will only process data from topicA
> > and none from topicB. When topicB is created later the app will
> > auto-rebalance to get the data (this is guaranteed by Streams itself).
> > However before this is true the join operator would see no data from
> topicB
> > to join to while proceeding. This behavior is actually the case before
> > Kafka version 2.0 and many users complained about it.
> >
> > 2. Does not start the app at all, notify the users that some topics are
> > missing and stop. This is what we changed in KAFKA-5037.
> >
> > 3. We can also, let app to stay up and running, but does not process any
> > data at all until all topics subscribed become available, etc etc.
> >
> >
> > Now depending on user's motivation cases and preferences, they may prefer
> > any of these options. The reason we chose to do 2) is to give users some
> > control upon the event and do their wrapping logic on top of it (e.g.
> like
> > John suggested). Hope this helps some clarifications for you.
> >
> >
> > Guozhang
> >
> >
> > On Mon, Mar 25, 2019 at 12:23 PM John Roesler <jo...@confluent.io> wrote:
> >
> > > Hi, Murlio,
> > >
> > > I found https://issues.apache.org/jira/browse/KAFKA-7970, which sounds
> > > like
> > > the answer is currently "yes". Unfortunately, it is still tricky to
> > handle
> > > this case, although the situation may improve soon.
> > >
> > > In the mean time, you can try to work around it with the StateListener.
> > > When Streams has a successful start-up, you'll see it transition from
> > > REBALANCING to RUNNING, so if you see it transition to
> PENDING_SHUTDOWN,
> > > NOT_RUNNING, or ERROR before you see "oldState: REBALANCING &&
> newState:
> > > RUNNING", you know that Streams did not have a successful startup. It
> > > sounds like you can't determine programmatically *why* this happened,
> but
> > > you can log a warning or error and then create a new the KafkaStreams
> > > object and try starting it again.
> > >
> > > I hope this helps, and feel free to comment on that ticket to add your
> > own
> > > perspective to the issue!
> > >
> > > Thanks,
> > > -John
> > >
> > > On Fri, Mar 22, 2019 at 3:25 PM Murilo Tavares <mu...@gmail.com>
> > > wrote:
> > >
> > > > Hi
> > > > After some research, I've come to a few discussions, and they all
> tell
> > me
> > > > that Kafka Streams require the topics to be created before starting
> the
> > > > application.
> > > > Nevertheless, I'd like my application to keep retrying if a topic
> does
> > > not
> > > > exist.
> > > > I've seen this thread:
> > > >
> https://groups.google.com/forum/#!topic/confluent-platform/nmfrnAKCM3c
> > ,
> > > > which is pretty old, and I'd like to know if it's still hard to catch
> > > that
> > > > Exception in my app.
> > > >
> > > > Thanks
> > > > Murilo
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: KafkaStreams backoff for non-existing topic

Posted by Patrik Kleindl <pk...@gmail.com>.
Hi Guozhang
Just a small question, why can't this be checked when trying to instantiate
KafkaStreams?
The Topology should know all topics and the existence of the topics could
be verified with the AdminClient.
This would allow to fail fast similar to when the state directory is not
available.
Or am I missing something?
best regards
Patrik

On Mon, 25 Mar 2019 at 23:15, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Murilo,
>
> Just to give some more background to John's message and KAFKA-7970 here.
> The main reason of trickiness is around the scenario of "topics being
> partially available", e.g. say your application is joining to topics A and
> B, while topicA exists but topicB does not (it is surprisingly common due
> to either human errors, or topic creation race conditions, etc). Then you
> have a few options at hand:
>
> 1. Just start the app normally, which will only process data from topicA
> and none from topicB. When topicB is created later the app will
> auto-rebalance to get the data (this is guaranteed by Streams itself).
> However before this is true the join operator would see no data from topicB
> to join to while proceeding. This behavior is actually the case before
> Kafka version 2.0 and many users complained about it.
>
> 2. Does not start the app at all, notify the users that some topics are
> missing and stop. This is what we changed in KAFKA-5037.
>
> 3. We can also, let app to stay up and running, but does not process any
> data at all until all topics subscribed become available, etc etc.
>
>
> Now depending on user's motivation cases and preferences, they may prefer
> any of these options. The reason we chose to do 2) is to give users some
> control upon the event and do their wrapping logic on top of it (e.g. like
> John suggested). Hope this helps some clarifications for you.
>
>
> Guozhang
>
>
> On Mon, Mar 25, 2019 at 12:23 PM John Roesler <jo...@confluent.io> wrote:
>
> > Hi, Murlio,
> >
> > I found https://issues.apache.org/jira/browse/KAFKA-7970, which sounds
> > like
> > the answer is currently "yes". Unfortunately, it is still tricky to
> handle
> > this case, although the situation may improve soon.
> >
> > In the mean time, you can try to work around it with the StateListener.
> > When Streams has a successful start-up, you'll see it transition from
> > REBALANCING to RUNNING, so if you see it transition to PENDING_SHUTDOWN,
> > NOT_RUNNING, or ERROR before you see "oldState: REBALANCING && newState:
> > RUNNING", you know that Streams did not have a successful startup. It
> > sounds like you can't determine programmatically *why* this happened, but
> > you can log a warning or error and then create a new the KafkaStreams
> > object and try starting it again.
> >
> > I hope this helps, and feel free to comment on that ticket to add your
> own
> > perspective to the issue!
> >
> > Thanks,
> > -John
> >
> > On Fri, Mar 22, 2019 at 3:25 PM Murilo Tavares <mu...@gmail.com>
> > wrote:
> >
> > > Hi
> > > After some research, I've come to a few discussions, and they all tell
> me
> > > that Kafka Streams require the topics to be created before starting the
> > > application.
> > > Nevertheless, I'd like my application to keep retrying if a topic does
> > not
> > > exist.
> > > I've seen this thread:
> > > https://groups.google.com/forum/#!topic/confluent-platform/nmfrnAKCM3c
> ,
> > > which is pretty old, and I'd like to know if it's still hard to catch
> > that
> > > Exception in my app.
> > >
> > > Thanks
> > > Murilo
> > >
> >
>
>
> --
> -- Guozhang
>

Re: KafkaStreams backoff for non-existing topic

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Murilo,

Just to give some more background to John's message and KAFKA-7970 here.
The main reason of trickiness is around the scenario of "topics being
partially available", e.g. say your application is joining to topics A and
B, while topicA exists but topicB does not (it is surprisingly common due
to either human errors, or topic creation race conditions, etc). Then you
have a few options at hand:

1. Just start the app normally, which will only process data from topicA
and none from topicB. When topicB is created later the app will
auto-rebalance to get the data (this is guaranteed by Streams itself).
However before this is true the join operator would see no data from topicB
to join to while proceeding. This behavior is actually the case before
Kafka version 2.0 and many users complained about it.

2. Does not start the app at all, notify the users that some topics are
missing and stop. This is what we changed in KAFKA-5037.

3. We can also, let app to stay up and running, but does not process any
data at all until all topics subscribed become available, etc etc.


Now depending on user's motivation cases and preferences, they may prefer
any of these options. The reason we chose to do 2) is to give users some
control upon the event and do their wrapping logic on top of it (e.g. like
John suggested). Hope this helps some clarifications for you.


Guozhang


On Mon, Mar 25, 2019 at 12:23 PM John Roesler <jo...@confluent.io> wrote:

> Hi, Murlio,
>
> I found https://issues.apache.org/jira/browse/KAFKA-7970, which sounds
> like
> the answer is currently "yes". Unfortunately, it is still tricky to handle
> this case, although the situation may improve soon.
>
> In the mean time, you can try to work around it with the StateListener.
> When Streams has a successful start-up, you'll see it transition from
> REBALANCING to RUNNING, so if you see it transition to PENDING_SHUTDOWN,
> NOT_RUNNING, or ERROR before you see "oldState: REBALANCING && newState:
> RUNNING", you know that Streams did not have a successful startup. It
> sounds like you can't determine programmatically *why* this happened, but
> you can log a warning or error and then create a new the KafkaStreams
> object and try starting it again.
>
> I hope this helps, and feel free to comment on that ticket to add your own
> perspective to the issue!
>
> Thanks,
> -John
>
> On Fri, Mar 22, 2019 at 3:25 PM Murilo Tavares <mu...@gmail.com>
> wrote:
>
> > Hi
> > After some research, I've come to a few discussions, and they all tell me
> > that Kafka Streams require the topics to be created before starting the
> > application.
> > Nevertheless, I'd like my application to keep retrying if a topic does
> not
> > exist.
> > I've seen this thread:
> > https://groups.google.com/forum/#!topic/confluent-platform/nmfrnAKCM3c,
> > which is pretty old, and I'd like to know if it's still hard to catch
> that
> > Exception in my app.
> >
> > Thanks
> > Murilo
> >
>


-- 
-- Guozhang

Re: KafkaStreams backoff for non-existing topic

Posted by John Roesler <jo...@confluent.io>.
Hi, Murlio,

I found https://issues.apache.org/jira/browse/KAFKA-7970, which sounds like
the answer is currently "yes". Unfortunately, it is still tricky to handle
this case, although the situation may improve soon.

In the mean time, you can try to work around it with the StateListener.
When Streams has a successful start-up, you'll see it transition from
REBALANCING to RUNNING, so if you see it transition to PENDING_SHUTDOWN,
NOT_RUNNING, or ERROR before you see "oldState: REBALANCING && newState:
RUNNING", you know that Streams did not have a successful startup. It
sounds like you can't determine programmatically *why* this happened, but
you can log a warning or error and then create a new the KafkaStreams
object and try starting it again.

I hope this helps, and feel free to comment on that ticket to add your own
perspective to the issue!

Thanks,
-John

On Fri, Mar 22, 2019 at 3:25 PM Murilo Tavares <mu...@gmail.com> wrote:

> Hi
> After some research, I've come to a few discussions, and they all tell me
> that Kafka Streams require the topics to be created before starting the
> application.
> Nevertheless, I'd like my application to keep retrying if a topic does not
> exist.
> I've seen this thread:
> https://groups.google.com/forum/#!topic/confluent-platform/nmfrnAKCM3c,
> which is pretty old, and I'd like to know if it's still hard to catch that
> Exception in my app.
>
> Thanks
> Murilo
>