You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jonathan Santilli <jo...@gmail.com> on 2019/03/01 17:11:51 UTC

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Hello John, hope you are well.
I have tested the version 2.2 release candidate (although I know it has
been postponed).
I have been following this email thread because I think am experiencing the
same issue. I have reported in an email to this list and also all the
details are in OS (
https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
).

After the test, the result is the same as before (at least for my case),
already processed records are passed again to the output topic causing the
data duplication:

...
2019-03-01 16:55:23,808 INFO
[XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
internals.StoreChangelogReader (StoreChangelogReader.java:221) -
stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No
checkpoint found for task 1_10 state store
KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS
turned on. *Reinitializing
the task and restore its state from the beginning.*

...


I was hoping for this to be fixed, but is not the case, at least for my
case.

If you can, please take a look at the question in SO, I was in contact with
Matthias about it, he also points me the place where probably the
potential but could be present.

Please, let me know any thoughts.


Cheers!
--
Jonathan


On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:

> Hi again, Peter,
>
> Just to close the loop about the bug in Suppress, we did get the (apparent)
> same report from a few other people:
> https://issues.apache.org/jira/browse/KAFKA-7895
>
> I also managed to reproduce the duplicate-result behavior, which could
> cause it to emit both intermediate results and duplicate final results.
>
> There's a patch for it in the 2.2 release candidate. Perhaps you can try it
> out and see if it resolves the issue for you?
>
> I'm backporting the fix to 2.1 as well, but I unfortunately missed the last
> 2.1 bugfix release.
>
> Thanks,
> -John
>
> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io> wrote:
>
> > Hi Peter,
> >
> > Thanks for the replies.
> >
> > Regarding transactions:
> > Yes, actually, with EOS enabled, the changelog and the output topics are
> > all produced with the same transactional producer, within the same
> > transactions. So it should already be atomic.
> >
> > Regarding restore:
> > Streams doesn't put the store into service until the restore is
> completed,
> > so it should be guaranteed not to happen. But there's of course no
> > guarantee that I didn't mess something up. I'll take a hard look at it.
> >
> > Regarding restoration and offsets:
> > Your guess is correct: Streams tracks the latest stored offset outside of
> > the store implementation itself, specifically by writing a file (called a
> > Checkpoint File) in the state directory. If the file is there, it reads
> > that offset and restores from that point. If the file is missing, it
> > restores from the beginning of the stream. So it should "just work" for
> > you. Just for completeness, there have been several edge cases discovered
> > where this mechanism isn't completely safe, so in the case of EOS, I
> > believe we actually disregard that checkpoint file and the prior state
> and
> > always rebuild from the earliest offset in the changelog.
> >
> > Personally, I would like to see us provide the ability to store the
> > checkpoint inside the state store, so that checkpoint updates are
> > linearized correctly w.r.t. data updates, but I actually haven't
> mentioned
> > this thought to anyone until now ;)
> >
> > Finally, regarding your prior email:
> > Yes, I was thinking that the "wrong" output values might be part of
> > rolled-back transactions and therefore enabling read-committed mode on
> the
> > consumer might tell a different story that what you've seen to date.
> >
> > I'm honestly still baffled about those intermediate results that are
> > sneaking out. I wonder if it's something specific to your data stream,
> like
> > maybe if there is maybe an edge case when two records have exactly the
> same
> > timestamp? I'll have to stare at the code some more...
> >
> > Regardless, in order to reap the benefits of running the app with EOS,
> you
> > really have to also set your consumers to read_committed. Otherwise,
> you'll
> > be seeing output data from aborted (aka rolled-back) transactions, and
> you
> > miss the intended "exactly once" guarantee.
> >
> > Thanks,
> > -John
> >
> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com>
> > wrote:
> >
> >> Hi John,
> >>
> >> Haven't been able to reinstate the demo yet, but I have been re-reading
> >> the following scenario of yours....
> >>
> >> On 1/24/19 11:48 PM, Peter Levart wrote:
> >> > Hi John,
> >> >
> >> > On 1/24/19 3:18 PM, John Roesler wrote:
> >> >
> >> >>
> >> >> The reason is that, upon restart, the suppression buffer can only
> >> >> "remember" what got sent & committed to its changelog topic before.
> >> >>
> >> >> The scenario I have in mind is:
> >> >>
> >> >> ...
> >> >> * buffer state X
> >> >> ...
> >> >> * flush state X to buffer changelog
> >> >> ...
> >> >> * commit transaction T0; start new transaction T1
> >> >> ...
> >> >> * emit final result X (in uncommitted transaction T1)
> >> >> ...
> >> >> * crash before flushing to the changelog the fact that state X was
> >> >> emitted.
> >> >> Also, transaction T1 gets aborted, since we crash before committing.
> >> >> ...
> >> >> * restart, restoring state X again from the changelog (because the
> emit
> >> >> didn't get committed)
> >> >> * start transaction T2
> >> >> * emit final result X again (in uncommitted transaction T2)
> >> >> ...
> >> >> * commit transaction T2
> >> >> ...
> >> >>
> >> >> So, the result gets emitted twice, but the first time is in an
> aborted
> >> >> transaction. This leads me to another clarifying question:
> >> >>
> >> >> Based on your first message, it seems like the duplicates you observe
> >> >> are
> >> >> in the output topic. When you read the topic, do you configure your
> >> >> consumer with "read committed" mode? If not, you'll see "results"
> from
> >> >> uncommitted transactions, which could explain the duplicates.
> >>
> >> ...and I was thinking that perhaps the right solution to the suppression
> >> problem would be to use transactional producers for the resulting output
> >> topic AND the store change-log. Is this possible? Does the compaction of
> >> the log on the brokers work for transactional producers as expected? In
> >> that case, the sending of final result and the marking of that fact in
> >> the store change log would together be an atomic operation.
> >> That said, I think there's another problem with suppression which looks
> >> like the supression processor is already processing the input while the
> >> state store has not been fully restored yet or something related... Is
> >> this guaranteed not to happen?
> >>
> >> And now something unrelated I wanted to ask...
> >>
> >> I'm trying to create my own custom state store. From the API I can see
> >> it is pretty straightforward. One thing that I don't quite understand is
> >> how Kafka Streams know whether to replay the whole change log after the
> >> store registers itself or just a part of it and which part (from which
> >> offset per partition). There doesn't seem to be any API point through
> >> which the store could communicate this information back to Kafka
> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
> >> Streams first invoke flush() on the store and then notes down the
> >> offsets from the change log producer somewhere? So next time the store
> >> is brought up, the log is only replayed from last noted down offset? So
> >> it can happen that the store gets some log entries that have already
> >> been incorporated in it (from the point of one flush before) but never
> >> misses any... In any case there has to be an indication somewhere that
> >> the store didn't survive and has to be rebuilt from scratch. How do
> >> Kafka Streams detect that situation? By placing some marker file into
> >> the directory reserved for store's local storage?
> >>
> >> Regards, Peter
> >>
> >>
>


-- 
Santilli Jonathan

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by Jonathan Santilli <jo...@gmail.com>.
Sure John, I will document it.


Thanks a lot for your reply.
--
Jonathan



On Tue, Mar 5, 2019 at 7:38 PM John Roesler <jo...@confluent.io> wrote:

> Hi Jonathan,
>
> Just a quick update: I have not been able to reproduce the duplicates issue
> with the 2.2 RC, even with a topology very similar to the one you included
> in your stackoverflow post.
>
> I think we should treat this as a new bug. Would you mind opening a new
> Jira bug ticket with some steps to reproduce the problem, and also exactly
> the behavior you observe?
>
> Thanks,
> -John
>
> On Mon, Mar 4, 2019 at 10:41 PM John Roesler <jo...@confluent.io> wrote:
>
> > Hi Jonathan,
> >
> > Sorry to hear that the feature is causing you trouble as well, and that
> > the 2.2 release candidate didn't seem to fix it.
> >
> > I'll try and do a repro based on the code in your SO post tomorrow.
> >
> > I don't think it's related to the duplicates, but that shutdown error is
> > puzzling. Can you print the topology (with topology.describe() ) ? This
> > will tell us what is in task 1 (i.e., *1_*) of your program.
> >
> > Thanks,
> > -John
> >
> > On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
> > jonathansantilli@gmail.com> wrote:
> >
> >> BTW, after stopping the app gracefully (Stream#close()), this error
> shows
> >> up repeatedly:
> >>
> >> 2019-03-01 17:18:07,819 WARN
> >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
> >> [0_0] Failed to write offset checkpoint file to
> >> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
> >>
> >> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
> >> (No such file or directory)
> >>
> >> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> >> ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> >> ~[?:1.8.0_191]
> >>
> >> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> >> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
> >>
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
> >> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
> >> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamTask.close(
> >> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
> >> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
> >> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
> >>
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
> >> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamThread.run(
> >> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
> >>
> >>
> >> However, I have checked and the folder created starts with: *1_*
> >>
> >> ls -lha /tmp/kafka-stream/XXX/1_1
> >> total 8
> >> drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
> >> drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
> >> -rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
> >> -rw-r--r--   1 a  b     0B  1 Mar 16:05 .lock
> >> drwxr-xr-x   3 a  b    96B  1 Mar 16:43
> >> KSTREAM-REDUCE-STATE-STORE-0000000005
> >>
> >>
> >>
> >> Cheers!
> >> --
> >> Jonathan
> >>
> >>
> >>
> >> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
> >> jonathansantilli@gmail.com>
> >> wrote:
> >>
> >> > Hello John, hope you are well.
> >> > I have tested the version 2.2 release candidate (although I know it
> has
> >> > been postponed).
> >> > I have been following this email thread because I think am
> experiencing
> >> > the same issue. I have reported in an email to this list and also all
> >> the
> >> > details are in OS (
> >> >
> >>
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> >> > ).
> >> >
> >> > After the test, the result is the same as before (at least for my
> case),
> >> > already processed records are passed again to the output topic causing
> >> the
> >> > data duplication:
> >> >
> >> > ...
> >> > 2019-03-01 16:55:23,808 INFO
> >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> >> > stream-thread
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> No
> >> > checkpoint found for task 1_10 state store
> >> > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> >> > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS
> turned
> >> on. *Reinitializing
> >> > the task and restore its state from the beginning.*
> >> >
> >> > ...
> >> >
> >> >
> >> > I was hoping for this to be fixed, but is not the case, at least for
> my
> >> > case.
> >> >
> >> > If you can, please take a look at the question in SO, I was in contact
> >> > with Matthias about it, he also points me the place where probably the
> >> > potential but could be present.
> >> >
> >> > Please, let me know any thoughts.
> >> >
> >> >
> >> > Cheers!
> >> > --
> >> > Jonathan
> >> >
> >> >
> >> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io>
> wrote:
> >> >
> >> >> Hi again, Peter,
> >> >>
> >> >> Just to close the loop about the bug in Suppress, we did get the
> >> >> (apparent)
> >> >> same report from a few other people:
> >> >> https://issues.apache.org/jira/browse/KAFKA-7895
> >> >>
> >> >> I also managed to reproduce the duplicate-result behavior, which
> could
> >> >> cause it to emit both intermediate results and duplicate final
> results.
> >> >>
> >> >> There's a patch for it in the 2.2 release candidate. Perhaps you can
> >> try
> >> >> it
> >> >> out and see if it resolves the issue for you?
> >> >>
> >> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed
> the
> >> >> last
> >> >> 2.1 bugfix release.
> >> >>
> >> >> Thanks,
> >> >> -John
> >> >>
> >> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io>
> >> wrote:
> >> >>
> >> >> > Hi Peter,
> >> >> >
> >> >> > Thanks for the replies.
> >> >> >
> >> >> > Regarding transactions:
> >> >> > Yes, actually, with EOS enabled, the changelog and the output
> topics
> >> are
> >> >> > all produced with the same transactional producer, within the same
> >> >> > transactions. So it should already be atomic.
> >> >> >
> >> >> > Regarding restore:
> >> >> > Streams doesn't put the store into service until the restore is
> >> >> completed,
> >> >> > so it should be guaranteed not to happen. But there's of course no
> >> >> > guarantee that I didn't mess something up. I'll take a hard look at
> >> it.
> >> >> >
> >> >> > Regarding restoration and offsets:
> >> >> > Your guess is correct: Streams tracks the latest stored offset
> >> outside
> >> >> of
> >> >> > the store implementation itself, specifically by writing a file
> >> (called
> >> >> a
> >> >> > Checkpoint File) in the state directory. If the file is there, it
> >> reads
> >> >> > that offset and restores from that point. If the file is missing,
> it
> >> >> > restores from the beginning of the stream. So it should "just work"
> >> for
> >> >> > you. Just for completeness, there have been several edge cases
> >> >> discovered
> >> >> > where this mechanism isn't completely safe, so in the case of EOS,
> I
> >> >> > believe we actually disregard that checkpoint file and the prior
> >> state
> >> >> and
> >> >> > always rebuild from the earliest offset in the changelog.
> >> >> >
> >> >> > Personally, I would like to see us provide the ability to store the
> >> >> > checkpoint inside the state store, so that checkpoint updates are
> >> >> > linearized correctly w.r.t. data updates, but I actually haven't
> >> >> mentioned
> >> >> > this thought to anyone until now ;)
> >> >> >
> >> >> > Finally, regarding your prior email:
> >> >> > Yes, I was thinking that the "wrong" output values might be part of
> >> >> > rolled-back transactions and therefore enabling read-committed mode
> >> on
> >> >> the
> >> >> > consumer might tell a different story that what you've seen to
> date.
> >> >> >
> >> >> > I'm honestly still baffled about those intermediate results that
> are
> >> >> > sneaking out. I wonder if it's something specific to your data
> >> stream,
> >> >> like
> >> >> > maybe if there is maybe an edge case when two records have exactly
> >> the
> >> >> same
> >> >> > timestamp? I'll have to stare at the code some more...
> >> >> >
> >> >> > Regardless, in order to reap the benefits of running the app with
> >> EOS,
> >> >> you
> >> >> > really have to also set your consumers to read_committed.
> Otherwise,
> >> >> you'll
> >> >> > be seeing output data from aborted (aka rolled-back) transactions,
> >> and
> >> >> you
> >> >> > miss the intended "exactly once" guarantee.
> >> >> >
> >> >> > Thanks,
> >> >> > -John
> >> >> >
> >> >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <
> peter.levart@gmail.com
> >> >
> >> >> > wrote:
> >> >> >
> >> >> >> Hi John,
> >> >> >>
> >> >> >> Haven't been able to reinstate the demo yet, but I have been
> >> re-reading
> >> >> >> the following scenario of yours....
> >> >> >>
> >> >> >> On 1/24/19 11:48 PM, Peter Levart wrote:
> >> >> >> > Hi John,
> >> >> >> >
> >> >> >> > On 1/24/19 3:18 PM, John Roesler wrote:
> >> >> >> >
> >> >> >> >>
> >> >> >> >> The reason is that, upon restart, the suppression buffer can
> only
> >> >> >> >> "remember" what got sent & committed to its changelog topic
> >> before.
> >> >> >> >>
> >> >> >> >> The scenario I have in mind is:
> >> >> >> >>
> >> >> >> >> ...
> >> >> >> >> * buffer state X
> >> >> >> >> ...
> >> >> >> >> * flush state X to buffer changelog
> >> >> >> >> ...
> >> >> >> >> * commit transaction T0; start new transaction T1
> >> >> >> >> ...
> >> >> >> >> * emit final result X (in uncommitted transaction T1)
> >> >> >> >> ...
> >> >> >> >> * crash before flushing to the changelog the fact that state X
> >> was
> >> >> >> >> emitted.
> >> >> >> >> Also, transaction T1 gets aborted, since we crash before
> >> committing.
> >> >> >> >> ...
> >> >> >> >> * restart, restoring state X again from the changelog (because
> >> the
> >> >> emit
> >> >> >> >> didn't get committed)
> >> >> >> >> * start transaction T2
> >> >> >> >> * emit final result X again (in uncommitted transaction T2)
> >> >> >> >> ...
> >> >> >> >> * commit transaction T2
> >> >> >> >> ...
> >> >> >> >>
> >> >> >> >> So, the result gets emitted twice, but the first time is in an
> >> >> aborted
> >> >> >> >> transaction. This leads me to another clarifying question:
> >> >> >> >>
> >> >> >> >> Based on your first message, it seems like the duplicates you
> >> >> observe
> >> >> >> >> are
> >> >> >> >> in the output topic. When you read the topic, do you configure
> >> your
> >> >> >> >> consumer with "read committed" mode? If not, you'll see
> "results"
> >> >> from
> >> >> >> >> uncommitted transactions, which could explain the duplicates.
> >> >> >>
> >> >> >> ...and I was thinking that perhaps the right solution to the
> >> >> suppression
> >> >> >> problem would be to use transactional producers for the resulting
> >> >> output
> >> >> >> topic AND the store change-log. Is this possible? Does the
> >> compaction
> >> >> of
> >> >> >> the log on the brokers work for transactional producers as
> >> expected? In
> >> >> >> that case, the sending of final result and the marking of that
> fact
> >> in
> >> >> >> the store change log would together be an atomic operation.
> >> >> >> That said, I think there's another problem with suppression which
> >> looks
> >> >> >> like the supression processor is already processing the input
> while
> >> the
> >> >> >> state store has not been fully restored yet or something
> related...
> >> Is
> >> >> >> this guaranteed not to happen?
> >> >> >>
> >> >> >> And now something unrelated I wanted to ask...
> >> >> >>
> >> >> >> I'm trying to create my own custom state store. From the API I can
> >> see
> >> >> >> it is pretty straightforward. One thing that I don't quite
> >> understand
> >> >> is
> >> >> >> how Kafka Streams know whether to replay the whole change log
> after
> >> the
> >> >> >> store registers itself or just a part of it and which part (from
> >> which
> >> >> >> offset per partition). There doesn't seem to be any API point
> >> through
> >> >> >> which the store could communicate this information back to Kafka
> >> >> >> Streams. Is such bookkeeping performed outside the store? Does
> Kafka
> >> >> >> Streams first invoke flush() on the store and then notes down the
> >> >> >> offsets from the change log producer somewhere? So next time the
> >> store
> >> >> >> is brought up, the log is only replayed from last noted down
> >> offset? So
> >> >> >> it can happen that the store gets some log entries that have
> already
> >> >> >> been incorporated in it (from the point of one flush before) but
> >> never
> >> >> >> misses any... In any case there has to be an indication somewhere
> >> that
> >> >> >> the store didn't survive and has to be rebuilt from scratch. How
> do
> >> >> >> Kafka Streams detect that situation? By placing some marker file
> >> into
> >> >> >> the directory reserved for store's local storage?
> >> >> >>
> >> >> >> Regards, Peter
> >> >> >>
> >> >> >>
> >> >>
> >> >
> >> >
> >> > --
> >> > Santilli Jonathan
> >> >
> >>
> >>
> >> --
> >> Santilli Jonathan
> >>
> >
>


-- 
Santilli Jonathan

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by John Roesler <jo...@confluent.io>.
Hi Jonathan,

Just a quick update: I have not been able to reproduce the duplicates issue
with the 2.2 RC, even with a topology very similar to the one you included
in your stackoverflow post.

I think we should treat this as a new bug. Would you mind opening a new
Jira bug ticket with some steps to reproduce the problem, and also exactly
the behavior you observe?

Thanks,
-John

On Mon, Mar 4, 2019 at 10:41 PM John Roesler <jo...@confluent.io> wrote:

> Hi Jonathan,
>
> Sorry to hear that the feature is causing you trouble as well, and that
> the 2.2 release candidate didn't seem to fix it.
>
> I'll try and do a repro based on the code in your SO post tomorrow.
>
> I don't think it's related to the duplicates, but that shutdown error is
> puzzling. Can you print the topology (with topology.describe() ) ? This
> will tell us what is in task 1 (i.e., *1_*) of your program.
>
> Thanks,
> -John
>
> On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
> jonathansantilli@gmail.com> wrote:
>
>> BTW, after stopping the app gracefully (Stream#close()), this error shows
>> up repeatedly:
>>
>> 2019-03-01 17:18:07,819 WARN
>> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
>> [0_0] Failed to write offset checkpoint file to
>> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
>>
>> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
>> (No such file or directory)
>>
>> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
>>
>> at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]
>>
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
>> ~[?:1.8.0_191]
>>
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
>> ~[?:1.8.0_191]
>>
>> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
>> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
>>
>> at
>>
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
>> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
>> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.StreamTask.close(
>> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
>> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
>> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
>>
>> at
>>
>> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
>> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.StreamThread.run(
>> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
>>
>>
>> However, I have checked and the folder created starts with: *1_*
>>
>> ls -lha /tmp/kafka-stream/XXX/1_1
>> total 8
>> drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
>> drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
>> -rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
>> -rw-r--r--   1 a  b     0B  1 Mar 16:05 .lock
>> drwxr-xr-x   3 a  b    96B  1 Mar 16:43
>> KSTREAM-REDUCE-STATE-STORE-0000000005
>>
>>
>>
>> Cheers!
>> --
>> Jonathan
>>
>>
>>
>> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
>> jonathansantilli@gmail.com>
>> wrote:
>>
>> > Hello John, hope you are well.
>> > I have tested the version 2.2 release candidate (although I know it has
>> > been postponed).
>> > I have been following this email thread because I think am experiencing
>> > the same issue. I have reported in an email to this list and also all
>> the
>> > details are in OS (
>> >
>> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
>> > ).
>> >
>> > After the test, the result is the same as before (at least for my case),
>> > already processed records are passed again to the output topic causing
>> the
>> > data duplication:
>> >
>> > ...
>> > 2019-03-01 16:55:23,808 INFO
>> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
>> > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> No
>> > checkpoint found for task 1_10 state store
>> > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
>> > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS turned
>> on. *Reinitializing
>> > the task and restore its state from the beginning.*
>> >
>> > ...
>> >
>> >
>> > I was hoping for this to be fixed, but is not the case, at least for my
>> > case.
>> >
>> > If you can, please take a look at the question in SO, I was in contact
>> > with Matthias about it, he also points me the place where probably the
>> > potential but could be present.
>> >
>> > Please, let me know any thoughts.
>> >
>> >
>> > Cheers!
>> > --
>> > Jonathan
>> >
>> >
>> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:
>> >
>> >> Hi again, Peter,
>> >>
>> >> Just to close the loop about the bug in Suppress, we did get the
>> >> (apparent)
>> >> same report from a few other people:
>> >> https://issues.apache.org/jira/browse/KAFKA-7895
>> >>
>> >> I also managed to reproduce the duplicate-result behavior, which could
>> >> cause it to emit both intermediate results and duplicate final results.
>> >>
>> >> There's a patch for it in the 2.2 release candidate. Perhaps you can
>> try
>> >> it
>> >> out and see if it resolves the issue for you?
>> >>
>> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed the
>> >> last
>> >> 2.1 bugfix release.
>> >>
>> >> Thanks,
>> >> -John
>> >>
>> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io>
>> wrote:
>> >>
>> >> > Hi Peter,
>> >> >
>> >> > Thanks for the replies.
>> >> >
>> >> > Regarding transactions:
>> >> > Yes, actually, with EOS enabled, the changelog and the output topics
>> are
>> >> > all produced with the same transactional producer, within the same
>> >> > transactions. So it should already be atomic.
>> >> >
>> >> > Regarding restore:
>> >> > Streams doesn't put the store into service until the restore is
>> >> completed,
>> >> > so it should be guaranteed not to happen. But there's of course no
>> >> > guarantee that I didn't mess something up. I'll take a hard look at
>> it.
>> >> >
>> >> > Regarding restoration and offsets:
>> >> > Your guess is correct: Streams tracks the latest stored offset
>> outside
>> >> of
>> >> > the store implementation itself, specifically by writing a file
>> (called
>> >> a
>> >> > Checkpoint File) in the state directory. If the file is there, it
>> reads
>> >> > that offset and restores from that point. If the file is missing, it
>> >> > restores from the beginning of the stream. So it should "just work"
>> for
>> >> > you. Just for completeness, there have been several edge cases
>> >> discovered
>> >> > where this mechanism isn't completely safe, so in the case of EOS, I
>> >> > believe we actually disregard that checkpoint file and the prior
>> state
>> >> and
>> >> > always rebuild from the earliest offset in the changelog.
>> >> >
>> >> > Personally, I would like to see us provide the ability to store the
>> >> > checkpoint inside the state store, so that checkpoint updates are
>> >> > linearized correctly w.r.t. data updates, but I actually haven't
>> >> mentioned
>> >> > this thought to anyone until now ;)
>> >> >
>> >> > Finally, regarding your prior email:
>> >> > Yes, I was thinking that the "wrong" output values might be part of
>> >> > rolled-back transactions and therefore enabling read-committed mode
>> on
>> >> the
>> >> > consumer might tell a different story that what you've seen to date.
>> >> >
>> >> > I'm honestly still baffled about those intermediate results that are
>> >> > sneaking out. I wonder if it's something specific to your data
>> stream,
>> >> like
>> >> > maybe if there is maybe an edge case when two records have exactly
>> the
>> >> same
>> >> > timestamp? I'll have to stare at the code some more...
>> >> >
>> >> > Regardless, in order to reap the benefits of running the app with
>> EOS,
>> >> you
>> >> > really have to also set your consumers to read_committed. Otherwise,
>> >> you'll
>> >> > be seeing output data from aborted (aka rolled-back) transactions,
>> and
>> >> you
>> >> > miss the intended "exactly once" guarantee.
>> >> >
>> >> > Thanks,
>> >> > -John
>> >> >
>> >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <peter.levart@gmail.com
>> >
>> >> > wrote:
>> >> >
>> >> >> Hi John,
>> >> >>
>> >> >> Haven't been able to reinstate the demo yet, but I have been
>> re-reading
>> >> >> the following scenario of yours....
>> >> >>
>> >> >> On 1/24/19 11:48 PM, Peter Levart wrote:
>> >> >> > Hi John,
>> >> >> >
>> >> >> > On 1/24/19 3:18 PM, John Roesler wrote:
>> >> >> >
>> >> >> >>
>> >> >> >> The reason is that, upon restart, the suppression buffer can only
>> >> >> >> "remember" what got sent & committed to its changelog topic
>> before.
>> >> >> >>
>> >> >> >> The scenario I have in mind is:
>> >> >> >>
>> >> >> >> ...
>> >> >> >> * buffer state X
>> >> >> >> ...
>> >> >> >> * flush state X to buffer changelog
>> >> >> >> ...
>> >> >> >> * commit transaction T0; start new transaction T1
>> >> >> >> ...
>> >> >> >> * emit final result X (in uncommitted transaction T1)
>> >> >> >> ...
>> >> >> >> * crash before flushing to the changelog the fact that state X
>> was
>> >> >> >> emitted.
>> >> >> >> Also, transaction T1 gets aborted, since we crash before
>> committing.
>> >> >> >> ...
>> >> >> >> * restart, restoring state X again from the changelog (because
>> the
>> >> emit
>> >> >> >> didn't get committed)
>> >> >> >> * start transaction T2
>> >> >> >> * emit final result X again (in uncommitted transaction T2)
>> >> >> >> ...
>> >> >> >> * commit transaction T2
>> >> >> >> ...
>> >> >> >>
>> >> >> >> So, the result gets emitted twice, but the first time is in an
>> >> aborted
>> >> >> >> transaction. This leads me to another clarifying question:
>> >> >> >>
>> >> >> >> Based on your first message, it seems like the duplicates you
>> >> observe
>> >> >> >> are
>> >> >> >> in the output topic. When you read the topic, do you configure
>> your
>> >> >> >> consumer with "read committed" mode? If not, you'll see "results"
>> >> from
>> >> >> >> uncommitted transactions, which could explain the duplicates.
>> >> >>
>> >> >> ...and I was thinking that perhaps the right solution to the
>> >> suppression
>> >> >> problem would be to use transactional producers for the resulting
>> >> output
>> >> >> topic AND the store change-log. Is this possible? Does the
>> compaction
>> >> of
>> >> >> the log on the brokers work for transactional producers as
>> expected? In
>> >> >> that case, the sending of final result and the marking of that fact
>> in
>> >> >> the store change log would together be an atomic operation.
>> >> >> That said, I think there's another problem with suppression which
>> looks
>> >> >> like the supression processor is already processing the input while
>> the
>> >> >> state store has not been fully restored yet or something related...
>> Is
>> >> >> this guaranteed not to happen?
>> >> >>
>> >> >> And now something unrelated I wanted to ask...
>> >> >>
>> >> >> I'm trying to create my own custom state store. From the API I can
>> see
>> >> >> it is pretty straightforward. One thing that I don't quite
>> understand
>> >> is
>> >> >> how Kafka Streams know whether to replay the whole change log after
>> the
>> >> >> store registers itself or just a part of it and which part (from
>> which
>> >> >> offset per partition). There doesn't seem to be any API point
>> through
>> >> >> which the store could communicate this information back to Kafka
>> >> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
>> >> >> Streams first invoke flush() on the store and then notes down the
>> >> >> offsets from the change log producer somewhere? So next time the
>> store
>> >> >> is brought up, the log is only replayed from last noted down
>> offset? So
>> >> >> it can happen that the store gets some log entries that have already
>> >> >> been incorporated in it (from the point of one flush before) but
>> never
>> >> >> misses any... In any case there has to be an indication somewhere
>> that
>> >> >> the store didn't survive and has to be rebuilt from scratch. How do
>> >> >> Kafka Streams detect that situation? By placing some marker file
>> into
>> >> >> the directory reserved for store's local storage?
>> >> >>
>> >> >> Regards, Peter
>> >> >>
>> >> >>
>> >>
>> >
>> >
>> > --
>> > Santilli Jonathan
>> >
>>
>>
>> --
>> Santilli Jonathan
>>
>

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by John Roesler <jo...@confluent.io>.
Hi Jonathan,

Sorry to hear that the feature is causing you trouble as well, and that the
2.2 release candidate didn't seem to fix it.

I'll try and do a repro based on the code in your SO post tomorrow.

I don't think it's related to the duplicates, but that shutdown error is
puzzling. Can you print the topology (with topology.describe() ) ? This
will tell us what is in task 1 (i.e., *1_*) of your program.

Thanks,
-John

On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
jonathansantilli@gmail.com> wrote:

> BTW, after stopping the app gracefully (Stream#close()), this error shows
> up repeatedly:
>
> 2019-03-01 17:18:07,819 WARN
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
> [0_0] Failed to write offset checkpoint file to
> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
>
> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
> (No such file or directory)
>
> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
>
> at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]
>
> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> ~[?:1.8.0_191]
>
> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> ~[?:1.8.0_191]
>
> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
>
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.StreamTask.close(
> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.StreamThread.run(
> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
>
>
> However, I have checked and the folder created starts with: *1_*
>
> ls -lha /tmp/kafka-stream/XXX/1_1
> total 8
> drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
> drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
> -rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
> -rw-r--r--   1 a  b     0B  1 Mar 16:05 .lock
> drwxr-xr-x   3 a  b    96B  1 Mar 16:43
> KSTREAM-REDUCE-STATE-STORE-0000000005
>
>
>
> Cheers!
> --
> Jonathan
>
>
>
> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
> jonathansantilli@gmail.com>
> wrote:
>
> > Hello John, hope you are well.
> > I have tested the version 2.2 release candidate (although I know it has
> > been postponed).
> > I have been following this email thread because I think am experiencing
> > the same issue. I have reported in an email to this list and also all the
> > details are in OS (
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> > ).
> >
> > After the test, the result is the same as before (at least for my case),
> > already processed records are passed again to the output topic causing
> the
> > data duplication:
> >
> > ...
> > 2019-03-01 16:55:23,808 INFO
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> No
> > checkpoint found for task 1_10 state store
> > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS turned
> on. *Reinitializing
> > the task and restore its state from the beginning.*
> >
> > ...
> >
> >
> > I was hoping for this to be fixed, but is not the case, at least for my
> > case.
> >
> > If you can, please take a look at the question in SO, I was in contact
> > with Matthias about it, he also points me the place where probably the
> > potential but could be present.
> >
> > Please, let me know any thoughts.
> >
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:
> >
> >> Hi again, Peter,
> >>
> >> Just to close the loop about the bug in Suppress, we did get the
> >> (apparent)
> >> same report from a few other people:
> >> https://issues.apache.org/jira/browse/KAFKA-7895
> >>
> >> I also managed to reproduce the duplicate-result behavior, which could
> >> cause it to emit both intermediate results and duplicate final results.
> >>
> >> There's a patch for it in the 2.2 release candidate. Perhaps you can try
> >> it
> >> out and see if it resolves the issue for you?
> >>
> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed the
> >> last
> >> 2.1 bugfix release.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io>
> wrote:
> >>
> >> > Hi Peter,
> >> >
> >> > Thanks for the replies.
> >> >
> >> > Regarding transactions:
> >> > Yes, actually, with EOS enabled, the changelog and the output topics
> are
> >> > all produced with the same transactional producer, within the same
> >> > transactions. So it should already be atomic.
> >> >
> >> > Regarding restore:
> >> > Streams doesn't put the store into service until the restore is
> >> completed,
> >> > so it should be guaranteed not to happen. But there's of course no
> >> > guarantee that I didn't mess something up. I'll take a hard look at
> it.
> >> >
> >> > Regarding restoration and offsets:
> >> > Your guess is correct: Streams tracks the latest stored offset outside
> >> of
> >> > the store implementation itself, specifically by writing a file
> (called
> >> a
> >> > Checkpoint File) in the state directory. If the file is there, it
> reads
> >> > that offset and restores from that point. If the file is missing, it
> >> > restores from the beginning of the stream. So it should "just work"
> for
> >> > you. Just for completeness, there have been several edge cases
> >> discovered
> >> > where this mechanism isn't completely safe, so in the case of EOS, I
> >> > believe we actually disregard that checkpoint file and the prior state
> >> and
> >> > always rebuild from the earliest offset in the changelog.
> >> >
> >> > Personally, I would like to see us provide the ability to store the
> >> > checkpoint inside the state store, so that checkpoint updates are
> >> > linearized correctly w.r.t. data updates, but I actually haven't
> >> mentioned
> >> > this thought to anyone until now ;)
> >> >
> >> > Finally, regarding your prior email:
> >> > Yes, I was thinking that the "wrong" output values might be part of
> >> > rolled-back transactions and therefore enabling read-committed mode on
> >> the
> >> > consumer might tell a different story that what you've seen to date.
> >> >
> >> > I'm honestly still baffled about those intermediate results that are
> >> > sneaking out. I wonder if it's something specific to your data stream,
> >> like
> >> > maybe if there is maybe an edge case when two records have exactly the
> >> same
> >> > timestamp? I'll have to stare at the code some more...
> >> >
> >> > Regardless, in order to reap the benefits of running the app with EOS,
> >> you
> >> > really have to also set your consumers to read_committed. Otherwise,
> >> you'll
> >> > be seeing output data from aborted (aka rolled-back) transactions, and
> >> you
> >> > miss the intended "exactly once" guarantee.
> >> >
> >> > Thanks,
> >> > -John
> >> >
> >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com>
> >> > wrote:
> >> >
> >> >> Hi John,
> >> >>
> >> >> Haven't been able to reinstate the demo yet, but I have been
> re-reading
> >> >> the following scenario of yours....
> >> >>
> >> >> On 1/24/19 11:48 PM, Peter Levart wrote:
> >> >> > Hi John,
> >> >> >
> >> >> > On 1/24/19 3:18 PM, John Roesler wrote:
> >> >> >
> >> >> >>
> >> >> >> The reason is that, upon restart, the suppression buffer can only
> >> >> >> "remember" what got sent & committed to its changelog topic
> before.
> >> >> >>
> >> >> >> The scenario I have in mind is:
> >> >> >>
> >> >> >> ...
> >> >> >> * buffer state X
> >> >> >> ...
> >> >> >> * flush state X to buffer changelog
> >> >> >> ...
> >> >> >> * commit transaction T0; start new transaction T1
> >> >> >> ...
> >> >> >> * emit final result X (in uncommitted transaction T1)
> >> >> >> ...
> >> >> >> * crash before flushing to the changelog the fact that state X was
> >> >> >> emitted.
> >> >> >> Also, transaction T1 gets aborted, since we crash before
> committing.
> >> >> >> ...
> >> >> >> * restart, restoring state X again from the changelog (because the
> >> emit
> >> >> >> didn't get committed)
> >> >> >> * start transaction T2
> >> >> >> * emit final result X again (in uncommitted transaction T2)
> >> >> >> ...
> >> >> >> * commit transaction T2
> >> >> >> ...
> >> >> >>
> >> >> >> So, the result gets emitted twice, but the first time is in an
> >> aborted
> >> >> >> transaction. This leads me to another clarifying question:
> >> >> >>
> >> >> >> Based on your first message, it seems like the duplicates you
> >> observe
> >> >> >> are
> >> >> >> in the output topic. When you read the topic, do you configure
> your
> >> >> >> consumer with "read committed" mode? If not, you'll see "results"
> >> from
> >> >> >> uncommitted transactions, which could explain the duplicates.
> >> >>
> >> >> ...and I was thinking that perhaps the right solution to the
> >> suppression
> >> >> problem would be to use transactional producers for the resulting
> >> output
> >> >> topic AND the store change-log. Is this possible? Does the compaction
> >> of
> >> >> the log on the brokers work for transactional producers as expected?
> In
> >> >> that case, the sending of final result and the marking of that fact
> in
> >> >> the store change log would together be an atomic operation.
> >> >> That said, I think there's another problem with suppression which
> looks
> >> >> like the supression processor is already processing the input while
> the
> >> >> state store has not been fully restored yet or something related...
> Is
> >> >> this guaranteed not to happen?
> >> >>
> >> >> And now something unrelated I wanted to ask...
> >> >>
> >> >> I'm trying to create my own custom state store. From the API I can
> see
> >> >> it is pretty straightforward. One thing that I don't quite understand
> >> is
> >> >> how Kafka Streams know whether to replay the whole change log after
> the
> >> >> store registers itself or just a part of it and which part (from
> which
> >> >> offset per partition). There doesn't seem to be any API point through
> >> >> which the store could communicate this information back to Kafka
> >> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
> >> >> Streams first invoke flush() on the store and then notes down the
> >> >> offsets from the change log producer somewhere? So next time the
> store
> >> >> is brought up, the log is only replayed from last noted down offset?
> So
> >> >> it can happen that the store gets some log entries that have already
> >> >> been incorporated in it (from the point of one flush before) but
> never
> >> >> misses any... In any case there has to be an indication somewhere
> that
> >> >> the store didn't survive and has to be rebuilt from scratch. How do
> >> >> Kafka Streams detect that situation? By placing some marker file into
> >> >> the directory reserved for store's local storage?
> >> >>
> >> >> Regards, Peter
> >> >>
> >> >>
> >>
> >
> >
> > --
> > Santilli Jonathan
> >
>
>
> --
> Santilli Jonathan
>

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by Jonathan Santilli <jo...@gmail.com>.
BTW, after stopping the app gracefully (Stream#close()), this error shows
up repeatedly:

2019-03-01 17:18:07,819 WARN
[XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
[0_0] Failed to write offset checkpoint file to
[/tmp/kafka-stream/XXX/0_0/.checkpoint]

java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
(No such file or directory)

at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]

at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]

at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[?:1.8.0_191]

at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[?:1.8.0_191]

at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
StreamTask.java:599) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamTask.close(
StreamTask.java:721) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
TaskManager.java:267) [kafka-streams-2.2.0.jar:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamThread.run(
StreamThread.java:786) [kafka-streams-2.2.0.jar:?]


However, I have checked and the folder created starts with: *1_*

ls -lha /tmp/kafka-stream/XXX/1_1
total 8
drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
-rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
-rw-r--r--   1 a  b     0B  1 Mar 16:05 .lock
drwxr-xr-x   3 a  b    96B  1 Mar 16:43
KSTREAM-REDUCE-STATE-STORE-0000000005



Cheers!
--
Jonathan



On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <jo...@gmail.com>
wrote:

> Hello John, hope you are well.
> I have tested the version 2.2 release candidate (although I know it has
> been postponed).
> I have been following this email thread because I think am experiencing
> the same issue. I have reported in an email to this list and also all the
> details are in OS (
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> ).
>
> After the test, the result is the same as before (at least for my case),
> already processed records are passed again to the output topic causing the
> data duplication:
>
> ...
> 2019-03-01 16:55:23,808 INFO  [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No
> checkpoint found for task 1_10 state store
> KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS turned on. *Reinitializing
> the task and restore its state from the beginning.*
>
> ...
>
>
> I was hoping for this to be fixed, but is not the case, at least for my
> case.
>
> If you can, please take a look at the question in SO, I was in contact
> with Matthias about it, he also points me the place where probably the
> potential but could be present.
>
> Please, let me know any thoughts.
>
>
> Cheers!
> --
> Jonathan
>
>
> On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:
>
>> Hi again, Peter,
>>
>> Just to close the loop about the bug in Suppress, we did get the
>> (apparent)
>> same report from a few other people:
>> https://issues.apache.org/jira/browse/KAFKA-7895
>>
>> I also managed to reproduce the duplicate-result behavior, which could
>> cause it to emit both intermediate results and duplicate final results.
>>
>> There's a patch for it in the 2.2 release candidate. Perhaps you can try
>> it
>> out and see if it resolves the issue for you?
>>
>> I'm backporting the fix to 2.1 as well, but I unfortunately missed the
>> last
>> 2.1 bugfix release.
>>
>> Thanks,
>> -John
>>
>> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io> wrote:
>>
>> > Hi Peter,
>> >
>> > Thanks for the replies.
>> >
>> > Regarding transactions:
>> > Yes, actually, with EOS enabled, the changelog and the output topics are
>> > all produced with the same transactional producer, within the same
>> > transactions. So it should already be atomic.
>> >
>> > Regarding restore:
>> > Streams doesn't put the store into service until the restore is
>> completed,
>> > so it should be guaranteed not to happen. But there's of course no
>> > guarantee that I didn't mess something up. I'll take a hard look at it.
>> >
>> > Regarding restoration and offsets:
>> > Your guess is correct: Streams tracks the latest stored offset outside
>> of
>> > the store implementation itself, specifically by writing a file (called
>> a
>> > Checkpoint File) in the state directory. If the file is there, it reads
>> > that offset and restores from that point. If the file is missing, it
>> > restores from the beginning of the stream. So it should "just work" for
>> > you. Just for completeness, there have been several edge cases
>> discovered
>> > where this mechanism isn't completely safe, so in the case of EOS, I
>> > believe we actually disregard that checkpoint file and the prior state
>> and
>> > always rebuild from the earliest offset in the changelog.
>> >
>> > Personally, I would like to see us provide the ability to store the
>> > checkpoint inside the state store, so that checkpoint updates are
>> > linearized correctly w.r.t. data updates, but I actually haven't
>> mentioned
>> > this thought to anyone until now ;)
>> >
>> > Finally, regarding your prior email:
>> > Yes, I was thinking that the "wrong" output values might be part of
>> > rolled-back transactions and therefore enabling read-committed mode on
>> the
>> > consumer might tell a different story that what you've seen to date.
>> >
>> > I'm honestly still baffled about those intermediate results that are
>> > sneaking out. I wonder if it's something specific to your data stream,
>> like
>> > maybe if there is maybe an edge case when two records have exactly the
>> same
>> > timestamp? I'll have to stare at the code some more...
>> >
>> > Regardless, in order to reap the benefits of running the app with EOS,
>> you
>> > really have to also set your consumers to read_committed. Otherwise,
>> you'll
>> > be seeing output data from aborted (aka rolled-back) transactions, and
>> you
>> > miss the intended "exactly once" guarantee.
>> >
>> > Thanks,
>> > -John
>> >
>> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com>
>> > wrote:
>> >
>> >> Hi John,
>> >>
>> >> Haven't been able to reinstate the demo yet, but I have been re-reading
>> >> the following scenario of yours....
>> >>
>> >> On 1/24/19 11:48 PM, Peter Levart wrote:
>> >> > Hi John,
>> >> >
>> >> > On 1/24/19 3:18 PM, John Roesler wrote:
>> >> >
>> >> >>
>> >> >> The reason is that, upon restart, the suppression buffer can only
>> >> >> "remember" what got sent & committed to its changelog topic before.
>> >> >>
>> >> >> The scenario I have in mind is:
>> >> >>
>> >> >> ...
>> >> >> * buffer state X
>> >> >> ...
>> >> >> * flush state X to buffer changelog
>> >> >> ...
>> >> >> * commit transaction T0; start new transaction T1
>> >> >> ...
>> >> >> * emit final result X (in uncommitted transaction T1)
>> >> >> ...
>> >> >> * crash before flushing to the changelog the fact that state X was
>> >> >> emitted.
>> >> >> Also, transaction T1 gets aborted, since we crash before committing.
>> >> >> ...
>> >> >> * restart, restoring state X again from the changelog (because the
>> emit
>> >> >> didn't get committed)
>> >> >> * start transaction T2
>> >> >> * emit final result X again (in uncommitted transaction T2)
>> >> >> ...
>> >> >> * commit transaction T2
>> >> >> ...
>> >> >>
>> >> >> So, the result gets emitted twice, but the first time is in an
>> aborted
>> >> >> transaction. This leads me to another clarifying question:
>> >> >>
>> >> >> Based on your first message, it seems like the duplicates you
>> observe
>> >> >> are
>> >> >> in the output topic. When you read the topic, do you configure your
>> >> >> consumer with "read committed" mode? If not, you'll see "results"
>> from
>> >> >> uncommitted transactions, which could explain the duplicates.
>> >>
>> >> ...and I was thinking that perhaps the right solution to the
>> suppression
>> >> problem would be to use transactional producers for the resulting
>> output
>> >> topic AND the store change-log. Is this possible? Does the compaction
>> of
>> >> the log on the brokers work for transactional producers as expected? In
>> >> that case, the sending of final result and the marking of that fact in
>> >> the store change log would together be an atomic operation.
>> >> That said, I think there's another problem with suppression which looks
>> >> like the supression processor is already processing the input while the
>> >> state store has not been fully restored yet or something related... Is
>> >> this guaranteed not to happen?
>> >>
>> >> And now something unrelated I wanted to ask...
>> >>
>> >> I'm trying to create my own custom state store. From the API I can see
>> >> it is pretty straightforward. One thing that I don't quite understand
>> is
>> >> how Kafka Streams know whether to replay the whole change log after the
>> >> store registers itself or just a part of it and which part (from which
>> >> offset per partition). There doesn't seem to be any API point through
>> >> which the store could communicate this information back to Kafka
>> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
>> >> Streams first invoke flush() on the store and then notes down the
>> >> offsets from the change log producer somewhere? So next time the store
>> >> is brought up, the log is only replayed from last noted down offset? So
>> >> it can happen that the store gets some log entries that have already
>> >> been incorporated in it (from the point of one flush before) but never
>> >> misses any... In any case there has to be an indication somewhere that
>> >> the store didn't survive and has to be rebuilt from scratch. How do
>> >> Kafka Streams detect that situation? By placing some marker file into
>> >> the directory reserved for store's local storage?
>> >>
>> >> Regards, Peter
>> >>
>> >>
>>
>
>
> --
> Santilli Jonathan
>


-- 
Santilli Jonathan