You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alex Brekken <br...@gmail.com> on 2019/10/01 03:16:23 UTC

Re: Kafka Streams can't run normally after restart/redeployment

You could try increasing retries and see if that helps as well as adjusting
the producer batch size to a lower value.  (I think the retries default is
Integer.MAX when you're on kafka streams version 2.1 or higher so you can
definitely increase it beyond 5).  Additionally you could look at the "
delivery.timeout.ms" config property.  Default is 2 minutes but you could
experiment with increasing it as well. Another property to check if you're
getting timeout exceptions would be "default.api.timeout.ms".  Those are
just some initial ideas, good luck!

Alex

On Thu, Sep 26, 2019 at 6:02 PM Xiyuan Hu <xi...@gmail.com> wrote:

> Thanks Alex! Some updates:
>
> I tried to restart service with staging pool, which has far less
> traffic as production environment. And after restart, the application
> works fine without issues. I assume I can't restart the service in
> production, is caused by the huge lag in production? The lag is mostly
> on the source topic and the first repartition topic(03-repartition).
> Other internal topics don't have lag. From the log, I also found that
> many nodes raised the exception :
> Failed to commit stream task 1_55 due to the following error:
> INFO | jvm 1 | 2019/09/26 14:35:37 |
> org.apache.kafka.streams.errors.StreamsException: task [1_55] Abort
> sending since an error caught with a previous record (...) to topic
> XXX-0000000003-changelog due to
> org.apache.kafka.common.errors.TimeoutException: Expiring 45 record(s)
> for XXX-0000000003-changelog-55:300100 ms has passed since batch
> creation
> INFO | jvm 1 | 2019/09/26 14:35:37 | You can increase producer
> parameter `retries` and `retry.backoff.ms` to avoid this error.
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring
> 45 record(s) for XXX-0000000003-changelog-55:300100 ms has passed
> since batch creation
>
> Once the thread raised above exception, it turns to DEAD state and
> can't process anymore. I followed the error message and increased the
> retries to 5. But still, a lot threads are dead due to
> java.lang.IllegalStateException: No current assignment for partition
> XXX-0000000003-repartition-30
>
> I check most instances, and most threads are stuck at either
> `PARTITIONS_REVOKED ` or `PARTITIONS_ASSIGNED `. The only config I
> changed are retries to 5, retry backoff ms to 300000ms, request
> timeout to 300000 ms, and 2000 max poll records. What should be the
> right approach to change those config values?
>
> Thanks a lot!
>
> On Thu, Sep 26, 2019 at 6:17 PM Alex Brekken <br...@gmail.com> wrote:
> >
> > 1.  Yeah I'm not sure why restarting is causing you problems.  You
> > shouldn't be changing your application ID just to get data flowing ,so
> > something is wrong there I'm just not sure what.
> >
> > 2.  Lag on the source topic?  I guess that depends on how long your
> > application is down and how quickly it can catch up once it's running.
> >
> > " Your suggestion will populate the duplicate messages to downstreams,
> > just the count number will be accurate. Is that correct?"
> >
> > Yes that's correct. The final count() function will actually subtract 1
> > when you a get a duplicate in the upstream KTable, and then add 1 back
> > again resulting in no change to the final number.
> >
> > Alex
> >
> >
> > On Thu, Sep 26, 2019 at 2:38 PM Xiyuan Hu <xi...@gmail.com> wrote:
> >
> > > Thanks for the reply!
> > >
> > > I have two questions:
> > > 1) No output issue only happens when I restart/redeployment the
> > > application with the same application Id. But when I run the
> > > application first time, it works fine. Thus, I assume suppress() is
> > > working fine, at least fine for the first run. The thing I can't
> > > understand is how Kafka streams works with restart/redeployment.
> > > 2) Another thought is, my application has a huge lag (50M messages).
> > > Will that be a problem during restart/redeployment?
> > >
> > > Your suggestion will populate the duplicate messages to downstreams,
> > > just the count number will be accurate. Is that correct?
> > >
> > > Thanks a lot!!
> > >
> > > On Thu, Sep 26, 2019 at 8:22 AM Alex Brekken <br...@gmail.com>
> wrote:
> > > >
> > > > So I'm not exactly sure why supress() isn't working for you, because
> it
> > > > should send out a final message once the window closes - assuming
> you're
> > > > still getting new messages flowing through the topology.  Have you
> tried
> > > > using the count function in KGroupedTable? It should handle
> duplicates
> > > > correctly.   So a slightly modified version or your topology might
> look
> > > > like this:
> > > >
> > > > KStream<Windowed<String>, Event> dedupedStream =
> > > > deserializedStream.selectKey( ... )
> > > > .groupByKey(Grouped.with(Serdes.String(), new
> JsonSerde<>(Event.class)))
> > > >
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > > .reduce((value1, value2) -> value2)
> > > > .filter(...)
> > > > .groupBy() //change your key here as needed
> > > > .count() //this count function should handle duplicates and still
> come
> > > out
> > > > with the right answer
> > > > .toStream()
> > > > .selectKey( ... )
> > > > .to(outputTopic);
> > > >
> > > > The idea here is you're not preventing duplicate messages from
> flowing
> > > > through, instead you're tolerating them and not allowing them to
> > > > incorrectly change your counts.  Also, you had a mapValues() call in
> > > there
> > > > too which creates another KTable.  It might not matter, but could
> you do
> > > > that as part of the reduce() step maybe?  (or replace the reduce with
> > > > aggregate which lets you have a different type than your input type).
> > > Then
> > > > your topology would only have 2 KTables instead of 3.  Good luck, if
> this
> > > > doesn't work then I'm out of ideas.  :).
> > > >
> > > > Alex
> > > >
> > > > On Thu, Sep 26, 2019 at 12:06 AM Xiyuan Hu <xi...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > The first selectKey/groupBy/windowedBy/reduce is to group messages
> by
> > > > > key and drop duplicated messages based on the new key, so that for
> > > > > each 1hr time window, each key will only populate 1 message. I use
> > > > > suppress() is to make sure only the latest message per time window
> > > > > will be sent.
> > > > >
> > > > > The second selectKey/groupBy/reduce is to do counting. After
> deduping,
> > > > > count how many different messages per window.
> > > > >
> > > > > e.g. The first groupBy is grouped by key (A + B), and drop all
> > > > > duplicates. The second groupBy is grouped by key (window start
> time +
> > > > > A). If I comment out suppress(), I assume all updates during
> reduce()
> > > > > will be populated to next step?
> > > > >
> > > > > Thanks!
> > > > >
> > > > > On Wed, Sep 25, 2019 at 11:16 PM Alex Brekken <br...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > You might want to try temporarily commenting the suppress() call
> > > just to
> > > > > > see if that's the cause of the issue. That said, what is the
> goal of
> > > this
> > > > > > topology? It looks like you're trying to produce a count at the
> end
> > > for a
> > > > > > key.  Is the windowedBy() and suppress() there just to eliminate
> > > > > > duplicates, or do you need the final results to be grouped by
> the 60
> > > > > minute
> > > > > > window?
> > > > > >
> > > > > > Alex
> > > > > >
> > > > > > On Wed, Sep 25, 2019 at 9:26 PM Xiyuan Hu <xiyuan.huhu@gmail.com
> >
> > > wrote:
> > > > > >
> > > > > > > Hi Alex,
> > > > > > > Thanks for the reply!
> > > > > > >
> > > > > > > Yes. After deploy with same application ID, source topic has
> new
> > > > > > > messages and the application is consuming them but no output
> at the
> > > > > > > end.
> > > > > > > suppress call is:
> > > > > > >
> .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > > >
> > > > > > > Topology is like below:
> > > > > > >
> > > > > > > final KStream<String, byte[]> source =
> builder.stream(inputTopic);
> > > > > > > KStream<String, Event> deserializedStream = source.mapValues(
> ...
> > > });
> > > > > > >
> > > > > > > KStream<Windowed<String>, Event> dedupedStream =
> > > > > > > deserializedStream.selectKey( ... )
> > > > > > > .groupByKey(Grouped.with(Serdes.String(), new
> > > > > JsonSerde<>(Event.class)))
> > > > > > >
> > > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > > > > > .reduce((value1, value2) -> value2)
> > > > > > >
> .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > > > .toStream();
> > > > > > >
> > > > > > > dedupedStream.selectKey( ... )
> > > > > > > .mapValues( ... )
> > > > > > > .filter(...)
> > > > > > > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde()))
> > > > > > > .reduce((value1, value2) -> {
> > > > > > >     long count1 = value1.getCount();
> > > > > > >     long count2 = value2.getCount();
> > > > > > >     value2.setCount(count1 + count2);
> > > > > > >     return value2;
> > > > > > > }
> > > > > > > )
> > > > > > > .toStream()
> > > > > > > .selectKey( ... )
> > > > > > > .to(outputTopic);
> > > > > > >
> > > > > > > On Wed, Sep 25, 2019 at 10:14 PM Alex Brekken <
> brekkal@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Xiyuan, just to clarify: after you restart the application
> > > (using
> > > > > the
> > > > > > > > same application ID as previously) there ARE new messages in
> the
> > > > > source
> > > > > > > > topic and your application IS consuming them, but you're not
> > > seeing
> > > > > any
> > > > > > > > output at the end?  How are you configuring your suppress()
> call?
> > > > > Is it
> > > > > > > > possible that messages are being held there and not emitted
> > > further
> > > > > > > > downstream? Does commenting the suppress call cause data to
> flow
> > > all
> > > > > the
> > > > > > > > way through?  In order to help further we might need to see
> your
> > > > > actual
> > > > > > > > topology code if that's possible.
> > > > > > > >
> > > > > > > > Alex
> > > > > > > >
> > > > > > > > On Wed, Sep 25, 2019 at 2:17 PM Xiyuan Hu <
> xiyuan.huhu@gmail.com
> > > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > If I change application id, it will start process new
> messages
> > > I
> > > > > > > > > assume? The old data will be dropped. But this solution
> will
> > > not
> > > > > work
> > > > > > > > > during production deployment, since we can't change
> > > application id
> > > > > for
> > > > > > > > > each release.
> > > > > > > > >
> > > > > > > > > My code looks like below:
> > > > > > > > >
> > > > > > > > > builder.stream(topicName)
> > > > > > > > > .mapValues()
> > > > > > > > > stream.selectKey(selectKey A)
> > > > > > > > > .groupByKey(..)
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > > > > > > > .reduce((value1,value2) -> value2)
> > > > > > > > > .suppress
> > > > > > > > > .toStreams()
> > > > > > > > > .selectKey(selectKey B)
> > > > > > > > > .mapValues()
> > > > > > > > > .filter()
> > > > > > > > > .groupByKey()
> > > > > > > > > .reduce
> > > > > > > > > .toStream()
> > > > > > > > > .to()
> > > > > > > > >
> > > > > > > > > It will create 5 internal topics:
> > > > > > > > > 03-repartition, 03-changelog, 09-changelog, 14-repartition,
> > > > > > > 14-changelog.
> > > > > > > > >
> > > > > > > > > When I restart/redeployment the application, only
> > > 03-repartition
> > > > > has
> > > > > > > > > traffic and messages, but no out-traffic. Other internal
> topics
> > > > > have
> > > > > > > > > no traffic at all after restart/redeployment.
> > > > > > > > > It only works when I change the application ID. Should I
> > > include
> > > > > > > > > streams.cleanUp() before start the stream each time? Or
> > > anything
> > > > > else
> > > > > > > > > goes wrong?
> > > > > > > > >
> > > > > > > > > Thanks a lot!
> > > > > > > > >
> > > > > > > > > On Wed, Sep 25, 2019 at 2:35 PM Boyang Chen <
> > > > > > > reluctanthero104@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hey Xiyuan,
> > > > > > > > > >
> > > > > > > > > > I would assume it's easier for us to help you by reading
> your
> > > > > > > application
> > > > > > > > > > with a full paste of code (a prototype). Changing
> > > application id
> > > > > > > would
> > > > > > > > > work
> > > > > > > > > > suggests that re-process all the data again shall work,
> do I
> > > > > > > understand
> > > > > > > > > > that correctly?
> > > > > > > > > >
> > > > > > > > > > Boyang
> > > > > > > > > >
> > > > > > > > > > On Wed, Sep 25, 2019 at 8:16 AM Xiyuan Hu <
> > > xiyuan.huhu@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > I'm running a Kafka streams app(v2.1.0) with windowed
> > > > > > > function(reduce
> > > > > > > > > > > and suppress). One thing I noticed is, every time when
> I
> > > > > > > redeployment
> > > > > > > > > > > or restart the application, I have to change the
> > > application
> > > > > ID to
> > > > > > > a
> > > > > > > > > > > new one, otherwise, only the reduce-repartition
> internal
> > > topic
> > > > > has
> > > > > > > > > > > input traffic(and it has no out-traffic), all other
> > > internal
> > > > > topics
> > > > > > > > > > > has no traffic as all. Looks like it just flows into
> the
> > > first
> > > > > > > > > > > internal repartition topic(reduce-repartiton), the
> > > > > reduce-changelog
> > > > > > > > > > > has no traffic and no output traffic as well.
> > > > > > > > > > >
> > > > > > > > > > > Could anyone know what's wrong with it? Changing
> > > application
> > > > > Id and
> > > > > > > > > > > create new internal topics each time seems not the
> right
> > > thing
> > > > > to
> > > > > > > go
> > > > > > > > > > > with.
> > > > > > > > > > >
> > > > > > > > > > > I started the app like below:
> > > > > > > > > > >
> > > > > > > > > > > streams = new
> KafkaStreams(topology.getTopology(config),
> > > > > > > > > > > properties.getProperties());
> > > > > > > > > > > streams.start();
> > > > > > > > > > >
> > > > > > > > > > > Any help would be appreciated! Thanks!
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
>