You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by John Roesler <jo...@confluent.io> on 2019/01/07 20:10:46 UTC
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Hi Peter,
Sorry, I just now have seen this thread.
You asked if this behavior is unexpected, and the answer is yes.
Suppress.untilWindowCloses is intended to emit only the final result,
regardless of restarts.
You also asked how the suppression buffer can resume after a restart, since
it's not persistent.
The answer is the same as for in-memory stores. The state of the store (or
buffer, in this case)
is persisted to a changelog topic, which is re-read on restart to re-create
the exact state prior to shutdown.
"Persistent" in the store nomenclature refers only to "persistent on the
local disk".
Just to confirm your response regarding the buffer size:
While it is better to use the public ("Suppressed.unbounded()") API, yes,
your buffer was already unbounded.
I looked at your custom transfomer, and it looks almost correct to me. The
only flaw seems to be that it only looks
for closed windows for the key currently being processed, which means that
if you have key "A" buffered, but don't get another event for it for a
while after the window closes, you won't emit the final result. This might
actually take longer than the window retention period, in which case, the
data would be deleted without ever emitting the final result.
You said you think it should be possible to get the DSL version working,
and I agree, since this is exactly what it was designed for. Do you mind
filing a bug in the "KAFKA" Jira project (
https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to
keep the investigation organized that way.
In the mean time, I'll take another look at your logs above and try to
reason about what could be wrong.
Just one clarification... For example, you showed
> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172], sum: 138902
> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164
Am I correct in thinking that the first, shorter list is the "incremental"
version, and the second is the "final" version? I think so, but am confused
by "INSTEAD OF".
Thanks for the report,
-John
On Wed, Dec 26, 2018 at 3:21 AM Peter Levart <pe...@gmail.com> wrote:
>
>
> On 12/21/18 3:16 PM, Peter Levart wrote:
> > I also see some results that are actual non-final window aggregations
> > that precede the final aggregations. These non-final results are never
> > emitted out of order (for example, no such non-final result would ever
> > come after the final result for a particular key/window).
>
> Absence of proof is not the proof of absence... And I have later
> observed (using the DSL variant, not the custom Transformer) an
> occurrence of a non-final result that was emited after restart of
> streams processor while the final result for the same key/window had
> been emitted before the restart:
>
> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550,
> 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856
> ...
> ... restart ...
> ...
> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550]
> INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648
>
>
> The app logic can not even rely on guarantee that results are ordered
> then. This is really not usable until the bug is fixed.
>
> Regards, Peter
>
>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by Jonathan Santilli <jo...@gmail.com>.
Sure John, I will document it.
Thanks a lot for your reply.
--
Jonathan
On Tue, Mar 5, 2019 at 7:38 PM John Roesler <jo...@confluent.io> wrote:
> Hi Jonathan,
>
> Just a quick update: I have not been able to reproduce the duplicates issue
> with the 2.2 RC, even with a topology very similar to the one you included
> in your stackoverflow post.
>
> I think we should treat this as a new bug. Would you mind opening a new
> Jira bug ticket with some steps to reproduce the problem, and also exactly
> the behavior you observe?
>
> Thanks,
> -John
>
> On Mon, Mar 4, 2019 at 10:41 PM John Roesler <jo...@confluent.io> wrote:
>
> > Hi Jonathan,
> >
> > Sorry to hear that the feature is causing you trouble as well, and that
> > the 2.2 release candidate didn't seem to fix it.
> >
> > I'll try and do a repro based on the code in your SO post tomorrow.
> >
> > I don't think it's related to the duplicates, but that shutdown error is
> > puzzling. Can you print the topology (with topology.describe() ) ? This
> > will tell us what is in task 1 (i.e., *1_*) of your program.
> >
> > Thanks,
> > -John
> >
> > On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
> > jonathansantilli@gmail.com> wrote:
> >
> >> BTW, after stopping the app gracefully (Stream#close()), this error
> shows
> >> up repeatedly:
> >>
> >> 2019-03-01 17:18:07,819 WARN
> >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
> >> [0_0] Failed to write offset checkpoint file to
> >> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
> >>
> >> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
> >> (No such file or directory)
> >>
> >> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> >> ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> >> ~[?:1.8.0_191]
> >>
> >> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> >> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
> >>
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
> >> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
> >> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamTask.close(
> >> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
> >> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
> >> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
> >>
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
> >> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamThread.run(
> >> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
> >>
> >>
> >> However, I have checked and the folder created starts with: *1_*
> >>
> >> ls -lha /tmp/kafka-stream/XXX/1_1
> >> total 8
> >> drwxr-xr-x 5 a b 160B 1 Mar 17:18 .
> >> drwxr-xr-x 34 a b 1.1K 1 Mar 17:15 ..
> >> -rw-r--r-- 1 a b 2.9K 1 Mar 17:18 .checkpoint
> >> -rw-r--r-- 1 a b 0B 1 Mar 16:05 .lock
> >> drwxr-xr-x 3 a b 96B 1 Mar 16:43
> >> KSTREAM-REDUCE-STATE-STORE-0000000005
> >>
> >>
> >>
> >> Cheers!
> >> --
> >> Jonathan
> >>
> >>
> >>
> >> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
> >> jonathansantilli@gmail.com>
> >> wrote:
> >>
> >> > Hello John, hope you are well.
> >> > I have tested the version 2.2 release candidate (although I know it
> has
> >> > been postponed).
> >> > I have been following this email thread because I think am
> experiencing
> >> > the same issue. I have reported in an email to this list and also all
> >> the
> >> > details are in OS (
> >> >
> >>
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> >> > ).
> >> >
> >> > After the test, the result is the same as before (at least for my
> case),
> >> > already processed records are passed again to the output topic causing
> >> the
> >> > data duplication:
> >> >
> >> > ...
> >> > 2019-03-01 16:55:23,808 INFO
> >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> >> > stream-thread
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> No
> >> > checkpoint found for task 1_10 state store
> >> > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> >> > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS
> turned
> >> on. *Reinitializing
> >> > the task and restore its state from the beginning.*
> >> >
> >> > ...
> >> >
> >> >
> >> > I was hoping for this to be fixed, but is not the case, at least for
> my
> >> > case.
> >> >
> >> > If you can, please take a look at the question in SO, I was in contact
> >> > with Matthias about it, he also points me the place where probably the
> >> > potential but could be present.
> >> >
> >> > Please, let me know any thoughts.
> >> >
> >> >
> >> > Cheers!
> >> > --
> >> > Jonathan
> >> >
> >> >
> >> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io>
> wrote:
> >> >
> >> >> Hi again, Peter,
> >> >>
> >> >> Just to close the loop about the bug in Suppress, we did get the
> >> >> (apparent)
> >> >> same report from a few other people:
> >> >> https://issues.apache.org/jira/browse/KAFKA-7895
> >> >>
> >> >> I also managed to reproduce the duplicate-result behavior, which
> could
> >> >> cause it to emit both intermediate results and duplicate final
> results.
> >> >>
> >> >> There's a patch for it in the 2.2 release candidate. Perhaps you can
> >> try
> >> >> it
> >> >> out and see if it resolves the issue for you?
> >> >>
> >> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed
> the
> >> >> last
> >> >> 2.1 bugfix release.
> >> >>
> >> >> Thanks,
> >> >> -John
> >> >>
> >> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io>
> >> wrote:
> >> >>
> >> >> > Hi Peter,
> >> >> >
> >> >> > Thanks for the replies.
> >> >> >
> >> >> > Regarding transactions:
> >> >> > Yes, actually, with EOS enabled, the changelog and the output
> topics
> >> are
> >> >> > all produced with the same transactional producer, within the same
> >> >> > transactions. So it should already be atomic.
> >> >> >
> >> >> > Regarding restore:
> >> >> > Streams doesn't put the store into service until the restore is
> >> >> completed,
> >> >> > so it should be guaranteed not to happen. But there's of course no
> >> >> > guarantee that I didn't mess something up. I'll take a hard look at
> >> it.
> >> >> >
> >> >> > Regarding restoration and offsets:
> >> >> > Your guess is correct: Streams tracks the latest stored offset
> >> outside
> >> >> of
> >> >> > the store implementation itself, specifically by writing a file
> >> (called
> >> >> a
> >> >> > Checkpoint File) in the state directory. If the file is there, it
> >> reads
> >> >> > that offset and restores from that point. If the file is missing,
> it
> >> >> > restores from the beginning of the stream. So it should "just work"
> >> for
> >> >> > you. Just for completeness, there have been several edge cases
> >> >> discovered
> >> >> > where this mechanism isn't completely safe, so in the case of EOS,
> I
> >> >> > believe we actually disregard that checkpoint file and the prior
> >> state
> >> >> and
> >> >> > always rebuild from the earliest offset in the changelog.
> >> >> >
> >> >> > Personally, I would like to see us provide the ability to store the
> >> >> > checkpoint inside the state store, so that checkpoint updates are
> >> >> > linearized correctly w.r.t. data updates, but I actually haven't
> >> >> mentioned
> >> >> > this thought to anyone until now ;)
> >> >> >
> >> >> > Finally, regarding your prior email:
> >> >> > Yes, I was thinking that the "wrong" output values might be part of
> >> >> > rolled-back transactions and therefore enabling read-committed mode
> >> on
> >> >> the
> >> >> > consumer might tell a different story that what you've seen to
> date.
> >> >> >
> >> >> > I'm honestly still baffled about those intermediate results that
> are
> >> >> > sneaking out. I wonder if it's something specific to your data
> >> stream,
> >> >> like
> >> >> > maybe if there is maybe an edge case when two records have exactly
> >> the
> >> >> same
> >> >> > timestamp? I'll have to stare at the code some more...
> >> >> >
> >> >> > Regardless, in order to reap the benefits of running the app with
> >> EOS,
> >> >> you
> >> >> > really have to also set your consumers to read_committed.
> Otherwise,
> >> >> you'll
> >> >> > be seeing output data from aborted (aka rolled-back) transactions,
> >> and
> >> >> you
> >> >> > miss the intended "exactly once" guarantee.
> >> >> >
> >> >> > Thanks,
> >> >> > -John
> >> >> >
> >> >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <
> peter.levart@gmail.com
> >> >
> >> >> > wrote:
> >> >> >
> >> >> >> Hi John,
> >> >> >>
> >> >> >> Haven't been able to reinstate the demo yet, but I have been
> >> re-reading
> >> >> >> the following scenario of yours....
> >> >> >>
> >> >> >> On 1/24/19 11:48 PM, Peter Levart wrote:
> >> >> >> > Hi John,
> >> >> >> >
> >> >> >> > On 1/24/19 3:18 PM, John Roesler wrote:
> >> >> >> >
> >> >> >> >>
> >> >> >> >> The reason is that, upon restart, the suppression buffer can
> only
> >> >> >> >> "remember" what got sent & committed to its changelog topic
> >> before.
> >> >> >> >>
> >> >> >> >> The scenario I have in mind is:
> >> >> >> >>
> >> >> >> >> ...
> >> >> >> >> * buffer state X
> >> >> >> >> ...
> >> >> >> >> * flush state X to buffer changelog
> >> >> >> >> ...
> >> >> >> >> * commit transaction T0; start new transaction T1
> >> >> >> >> ...
> >> >> >> >> * emit final result X (in uncommitted transaction T1)
> >> >> >> >> ...
> >> >> >> >> * crash before flushing to the changelog the fact that state X
> >> was
> >> >> >> >> emitted.
> >> >> >> >> Also, transaction T1 gets aborted, since we crash before
> >> committing.
> >> >> >> >> ...
> >> >> >> >> * restart, restoring state X again from the changelog (because
> >> the
> >> >> emit
> >> >> >> >> didn't get committed)
> >> >> >> >> * start transaction T2
> >> >> >> >> * emit final result X again (in uncommitted transaction T2)
> >> >> >> >> ...
> >> >> >> >> * commit transaction T2
> >> >> >> >> ...
> >> >> >> >>
> >> >> >> >> So, the result gets emitted twice, but the first time is in an
> >> >> aborted
> >> >> >> >> transaction. This leads me to another clarifying question:
> >> >> >> >>
> >> >> >> >> Based on your first message, it seems like the duplicates you
> >> >> observe
> >> >> >> >> are
> >> >> >> >> in the output topic. When you read the topic, do you configure
> >> your
> >> >> >> >> consumer with "read committed" mode? If not, you'll see
> "results"
> >> >> from
> >> >> >> >> uncommitted transactions, which could explain the duplicates.
> >> >> >>
> >> >> >> ...and I was thinking that perhaps the right solution to the
> >> >> suppression
> >> >> >> problem would be to use transactional producers for the resulting
> >> >> output
> >> >> >> topic AND the store change-log. Is this possible? Does the
> >> compaction
> >> >> of
> >> >> >> the log on the brokers work for transactional producers as
> >> expected? In
> >> >> >> that case, the sending of final result and the marking of that
> fact
> >> in
> >> >> >> the store change log would together be an atomic operation.
> >> >> >> That said, I think there's another problem with suppression which
> >> looks
> >> >> >> like the supression processor is already processing the input
> while
> >> the
> >> >> >> state store has not been fully restored yet or something
> related...
> >> Is
> >> >> >> this guaranteed not to happen?
> >> >> >>
> >> >> >> And now something unrelated I wanted to ask...
> >> >> >>
> >> >> >> I'm trying to create my own custom state store. From the API I can
> >> see
> >> >> >> it is pretty straightforward. One thing that I don't quite
> >> understand
> >> >> is
> >> >> >> how Kafka Streams know whether to replay the whole change log
> after
> >> the
> >> >> >> store registers itself or just a part of it and which part (from
> >> which
> >> >> >> offset per partition). There doesn't seem to be any API point
> >> through
> >> >> >> which the store could communicate this information back to Kafka
> >> >> >> Streams. Is such bookkeeping performed outside the store? Does
> Kafka
> >> >> >> Streams first invoke flush() on the store and then notes down the
> >> >> >> offsets from the change log producer somewhere? So next time the
> >> store
> >> >> >> is brought up, the log is only replayed from last noted down
> >> offset? So
> >> >> >> it can happen that the store gets some log entries that have
> already
> >> >> >> been incorporated in it (from the point of one flush before) but
> >> never
> >> >> >> misses any... In any case there has to be an indication somewhere
> >> that
> >> >> >> the store didn't survive and has to be rebuilt from scratch. How
> do
> >> >> >> Kafka Streams detect that situation? By placing some marker file
> >> into
> >> >> >> the directory reserved for store's local storage?
> >> >> >>
> >> >> >> Regards, Peter
> >> >> >>
> >> >> >>
> >> >>
> >> >
> >> >
> >> > --
> >> > Santilli Jonathan
> >> >
> >>
> >>
> >> --
> >> Santilli Jonathan
> >>
> >
>
--
Santilli Jonathan
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by John Roesler <jo...@confluent.io>.
Hi Jonathan,
Just a quick update: I have not been able to reproduce the duplicates issue
with the 2.2 RC, even with a topology very similar to the one you included
in your stackoverflow post.
I think we should treat this as a new bug. Would you mind opening a new
Jira bug ticket with some steps to reproduce the problem, and also exactly
the behavior you observe?
Thanks,
-John
On Mon, Mar 4, 2019 at 10:41 PM John Roesler <jo...@confluent.io> wrote:
> Hi Jonathan,
>
> Sorry to hear that the feature is causing you trouble as well, and that
> the 2.2 release candidate didn't seem to fix it.
>
> I'll try and do a repro based on the code in your SO post tomorrow.
>
> I don't think it's related to the duplicates, but that shutdown error is
> puzzling. Can you print the topology (with topology.describe() ) ? This
> will tell us what is in task 1 (i.e., *1_*) of your program.
>
> Thanks,
> -John
>
> On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
> jonathansantilli@gmail.com> wrote:
>
>> BTW, after stopping the app gracefully (Stream#close()), this error shows
>> up repeatedly:
>>
>> 2019-03-01 17:18:07,819 WARN
>> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
>> [0_0] Failed to write offset checkpoint file to
>> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
>>
>> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
>> (No such file or directory)
>>
>> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
>>
>> at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]
>>
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
>> ~[?:1.8.0_191]
>>
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
>> ~[?:1.8.0_191]
>>
>> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
>> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
>>
>> at
>>
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
>> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
>> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.StreamTask.close(
>> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
>> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
>> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
>>
>> at
>>
>> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
>> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.StreamThread.run(
>> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
>>
>>
>> However, I have checked and the folder created starts with: *1_*
>>
>> ls -lha /tmp/kafka-stream/XXX/1_1
>> total 8
>> drwxr-xr-x 5 a b 160B 1 Mar 17:18 .
>> drwxr-xr-x 34 a b 1.1K 1 Mar 17:15 ..
>> -rw-r--r-- 1 a b 2.9K 1 Mar 17:18 .checkpoint
>> -rw-r--r-- 1 a b 0B 1 Mar 16:05 .lock
>> drwxr-xr-x 3 a b 96B 1 Mar 16:43
>> KSTREAM-REDUCE-STATE-STORE-0000000005
>>
>>
>>
>> Cheers!
>> --
>> Jonathan
>>
>>
>>
>> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
>> jonathansantilli@gmail.com>
>> wrote:
>>
>> > Hello John, hope you are well.
>> > I have tested the version 2.2 release candidate (although I know it has
>> > been postponed).
>> > I have been following this email thread because I think am experiencing
>> > the same issue. I have reported in an email to this list and also all
>> the
>> > details are in OS (
>> >
>> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
>> > ).
>> >
>> > After the test, the result is the same as before (at least for my case),
>> > already processed records are passed again to the output topic causing
>> the
>> > data duplication:
>> >
>> > ...
>> > 2019-03-01 16:55:23,808 INFO
>> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
>> > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> No
>> > checkpoint found for task 1_10 state store
>> > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
>> > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS turned
>> on. *Reinitializing
>> > the task and restore its state from the beginning.*
>> >
>> > ...
>> >
>> >
>> > I was hoping for this to be fixed, but is not the case, at least for my
>> > case.
>> >
>> > If you can, please take a look at the question in SO, I was in contact
>> > with Matthias about it, he also points me the place where probably the
>> > potential but could be present.
>> >
>> > Please, let me know any thoughts.
>> >
>> >
>> > Cheers!
>> > --
>> > Jonathan
>> >
>> >
>> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:
>> >
>> >> Hi again, Peter,
>> >>
>> >> Just to close the loop about the bug in Suppress, we did get the
>> >> (apparent)
>> >> same report from a few other people:
>> >> https://issues.apache.org/jira/browse/KAFKA-7895
>> >>
>> >> I also managed to reproduce the duplicate-result behavior, which could
>> >> cause it to emit both intermediate results and duplicate final results.
>> >>
>> >> There's a patch for it in the 2.2 release candidate. Perhaps you can
>> try
>> >> it
>> >> out and see if it resolves the issue for you?
>> >>
>> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed the
>> >> last
>> >> 2.1 bugfix release.
>> >>
>> >> Thanks,
>> >> -John
>> >>
>> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io>
>> wrote:
>> >>
>> >> > Hi Peter,
>> >> >
>> >> > Thanks for the replies.
>> >> >
>> >> > Regarding transactions:
>> >> > Yes, actually, with EOS enabled, the changelog and the output topics
>> are
>> >> > all produced with the same transactional producer, within the same
>> >> > transactions. So it should already be atomic.
>> >> >
>> >> > Regarding restore:
>> >> > Streams doesn't put the store into service until the restore is
>> >> completed,
>> >> > so it should be guaranteed not to happen. But there's of course no
>> >> > guarantee that I didn't mess something up. I'll take a hard look at
>> it.
>> >> >
>> >> > Regarding restoration and offsets:
>> >> > Your guess is correct: Streams tracks the latest stored offset
>> outside
>> >> of
>> >> > the store implementation itself, specifically by writing a file
>> (called
>> >> a
>> >> > Checkpoint File) in the state directory. If the file is there, it
>> reads
>> >> > that offset and restores from that point. If the file is missing, it
>> >> > restores from the beginning of the stream. So it should "just work"
>> for
>> >> > you. Just for completeness, there have been several edge cases
>> >> discovered
>> >> > where this mechanism isn't completely safe, so in the case of EOS, I
>> >> > believe we actually disregard that checkpoint file and the prior
>> state
>> >> and
>> >> > always rebuild from the earliest offset in the changelog.
>> >> >
>> >> > Personally, I would like to see us provide the ability to store the
>> >> > checkpoint inside the state store, so that checkpoint updates are
>> >> > linearized correctly w.r.t. data updates, but I actually haven't
>> >> mentioned
>> >> > this thought to anyone until now ;)
>> >> >
>> >> > Finally, regarding your prior email:
>> >> > Yes, I was thinking that the "wrong" output values might be part of
>> >> > rolled-back transactions and therefore enabling read-committed mode
>> on
>> >> the
>> >> > consumer might tell a different story that what you've seen to date.
>> >> >
>> >> > I'm honestly still baffled about those intermediate results that are
>> >> > sneaking out. I wonder if it's something specific to your data
>> stream,
>> >> like
>> >> > maybe if there is maybe an edge case when two records have exactly
>> the
>> >> same
>> >> > timestamp? I'll have to stare at the code some more...
>> >> >
>> >> > Regardless, in order to reap the benefits of running the app with
>> EOS,
>> >> you
>> >> > really have to also set your consumers to read_committed. Otherwise,
>> >> you'll
>> >> > be seeing output data from aborted (aka rolled-back) transactions,
>> and
>> >> you
>> >> > miss the intended "exactly once" guarantee.
>> >> >
>> >> > Thanks,
>> >> > -John
>> >> >
>> >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <peter.levart@gmail.com
>> >
>> >> > wrote:
>> >> >
>> >> >> Hi John,
>> >> >>
>> >> >> Haven't been able to reinstate the demo yet, but I have been
>> re-reading
>> >> >> the following scenario of yours....
>> >> >>
>> >> >> On 1/24/19 11:48 PM, Peter Levart wrote:
>> >> >> > Hi John,
>> >> >> >
>> >> >> > On 1/24/19 3:18 PM, John Roesler wrote:
>> >> >> >
>> >> >> >>
>> >> >> >> The reason is that, upon restart, the suppression buffer can only
>> >> >> >> "remember" what got sent & committed to its changelog topic
>> before.
>> >> >> >>
>> >> >> >> The scenario I have in mind is:
>> >> >> >>
>> >> >> >> ...
>> >> >> >> * buffer state X
>> >> >> >> ...
>> >> >> >> * flush state X to buffer changelog
>> >> >> >> ...
>> >> >> >> * commit transaction T0; start new transaction T1
>> >> >> >> ...
>> >> >> >> * emit final result X (in uncommitted transaction T1)
>> >> >> >> ...
>> >> >> >> * crash before flushing to the changelog the fact that state X
>> was
>> >> >> >> emitted.
>> >> >> >> Also, transaction T1 gets aborted, since we crash before
>> committing.
>> >> >> >> ...
>> >> >> >> * restart, restoring state X again from the changelog (because
>> the
>> >> emit
>> >> >> >> didn't get committed)
>> >> >> >> * start transaction T2
>> >> >> >> * emit final result X again (in uncommitted transaction T2)
>> >> >> >> ...
>> >> >> >> * commit transaction T2
>> >> >> >> ...
>> >> >> >>
>> >> >> >> So, the result gets emitted twice, but the first time is in an
>> >> aborted
>> >> >> >> transaction. This leads me to another clarifying question:
>> >> >> >>
>> >> >> >> Based on your first message, it seems like the duplicates you
>> >> observe
>> >> >> >> are
>> >> >> >> in the output topic. When you read the topic, do you configure
>> your
>> >> >> >> consumer with "read committed" mode? If not, you'll see "results"
>> >> from
>> >> >> >> uncommitted transactions, which could explain the duplicates.
>> >> >>
>> >> >> ...and I was thinking that perhaps the right solution to the
>> >> suppression
>> >> >> problem would be to use transactional producers for the resulting
>> >> output
>> >> >> topic AND the store change-log. Is this possible? Does the
>> compaction
>> >> of
>> >> >> the log on the brokers work for transactional producers as
>> expected? In
>> >> >> that case, the sending of final result and the marking of that fact
>> in
>> >> >> the store change log would together be an atomic operation.
>> >> >> That said, I think there's another problem with suppression which
>> looks
>> >> >> like the supression processor is already processing the input while
>> the
>> >> >> state store has not been fully restored yet or something related...
>> Is
>> >> >> this guaranteed not to happen?
>> >> >>
>> >> >> And now something unrelated I wanted to ask...
>> >> >>
>> >> >> I'm trying to create my own custom state store. From the API I can
>> see
>> >> >> it is pretty straightforward. One thing that I don't quite
>> understand
>> >> is
>> >> >> how Kafka Streams know whether to replay the whole change log after
>> the
>> >> >> store registers itself or just a part of it and which part (from
>> which
>> >> >> offset per partition). There doesn't seem to be any API point
>> through
>> >> >> which the store could communicate this information back to Kafka
>> >> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
>> >> >> Streams first invoke flush() on the store and then notes down the
>> >> >> offsets from the change log producer somewhere? So next time the
>> store
>> >> >> is brought up, the log is only replayed from last noted down
>> offset? So
>> >> >> it can happen that the store gets some log entries that have already
>> >> >> been incorporated in it (from the point of one flush before) but
>> never
>> >> >> misses any... In any case there has to be an indication somewhere
>> that
>> >> >> the store didn't survive and has to be rebuilt from scratch. How do
>> >> >> Kafka Streams detect that situation? By placing some marker file
>> into
>> >> >> the directory reserved for store's local storage?
>> >> >>
>> >> >> Regards, Peter
>> >> >>
>> >> >>
>> >>
>> >
>> >
>> > --
>> > Santilli Jonathan
>> >
>>
>>
>> --
>> Santilli Jonathan
>>
>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by John Roesler <jo...@confluent.io>.
Hi Jonathan,
Sorry to hear that the feature is causing you trouble as well, and that the
2.2 release candidate didn't seem to fix it.
I'll try and do a repro based on the code in your SO post tomorrow.
I don't think it's related to the duplicates, but that shutdown error is
puzzling. Can you print the topology (with topology.describe() ) ? This
will tell us what is in task 1 (i.e., *1_*) of your program.
Thanks,
-John
On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
jonathansantilli@gmail.com> wrote:
> BTW, after stopping the app gracefully (Stream#close()), this error shows
> up repeatedly:
>
> 2019-03-01 17:18:07,819 WARN
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
> [0_0] Failed to write offset checkpoint file to
> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
>
> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
> (No such file or directory)
>
> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
>
> at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]
>
> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> ~[?:1.8.0_191]
>
> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> ~[?:1.8.0_191]
>
> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
>
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.StreamTask.close(
> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.StreamThread.run(
> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
>
>
> However, I have checked and the folder created starts with: *1_*
>
> ls -lha /tmp/kafka-stream/XXX/1_1
> total 8
> drwxr-xr-x 5 a b 160B 1 Mar 17:18 .
> drwxr-xr-x 34 a b 1.1K 1 Mar 17:15 ..
> -rw-r--r-- 1 a b 2.9K 1 Mar 17:18 .checkpoint
> -rw-r--r-- 1 a b 0B 1 Mar 16:05 .lock
> drwxr-xr-x 3 a b 96B 1 Mar 16:43
> KSTREAM-REDUCE-STATE-STORE-0000000005
>
>
>
> Cheers!
> --
> Jonathan
>
>
>
> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
> jonathansantilli@gmail.com>
> wrote:
>
> > Hello John, hope you are well.
> > I have tested the version 2.2 release candidate (although I know it has
> > been postponed).
> > I have been following this email thread because I think am experiencing
> > the same issue. I have reported in an email to this list and also all the
> > details are in OS (
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> > ).
> >
> > After the test, the result is the same as before (at least for my case),
> > already processed records are passed again to the output topic causing
> the
> > data duplication:
> >
> > ...
> > 2019-03-01 16:55:23,808 INFO
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> No
> > checkpoint found for task 1_10 state store
> > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS turned
> on. *Reinitializing
> > the task and restore its state from the beginning.*
> >
> > ...
> >
> >
> > I was hoping for this to be fixed, but is not the case, at least for my
> > case.
> >
> > If you can, please take a look at the question in SO, I was in contact
> > with Matthias about it, he also points me the place where probably the
> > potential but could be present.
> >
> > Please, let me know any thoughts.
> >
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:
> >
> >> Hi again, Peter,
> >>
> >> Just to close the loop about the bug in Suppress, we did get the
> >> (apparent)
> >> same report from a few other people:
> >> https://issues.apache.org/jira/browse/KAFKA-7895
> >>
> >> I also managed to reproduce the duplicate-result behavior, which could
> >> cause it to emit both intermediate results and duplicate final results.
> >>
> >> There's a patch for it in the 2.2 release candidate. Perhaps you can try
> >> it
> >> out and see if it resolves the issue for you?
> >>
> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed the
> >> last
> >> 2.1 bugfix release.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io>
> wrote:
> >>
> >> > Hi Peter,
> >> >
> >> > Thanks for the replies.
> >> >
> >> > Regarding transactions:
> >> > Yes, actually, with EOS enabled, the changelog and the output topics
> are
> >> > all produced with the same transactional producer, within the same
> >> > transactions. So it should already be atomic.
> >> >
> >> > Regarding restore:
> >> > Streams doesn't put the store into service until the restore is
> >> completed,
> >> > so it should be guaranteed not to happen. But there's of course no
> >> > guarantee that I didn't mess something up. I'll take a hard look at
> it.
> >> >
> >> > Regarding restoration and offsets:
> >> > Your guess is correct: Streams tracks the latest stored offset outside
> >> of
> >> > the store implementation itself, specifically by writing a file
> (called
> >> a
> >> > Checkpoint File) in the state directory. If the file is there, it
> reads
> >> > that offset and restores from that point. If the file is missing, it
> >> > restores from the beginning of the stream. So it should "just work"
> for
> >> > you. Just for completeness, there have been several edge cases
> >> discovered
> >> > where this mechanism isn't completely safe, so in the case of EOS, I
> >> > believe we actually disregard that checkpoint file and the prior state
> >> and
> >> > always rebuild from the earliest offset in the changelog.
> >> >
> >> > Personally, I would like to see us provide the ability to store the
> >> > checkpoint inside the state store, so that checkpoint updates are
> >> > linearized correctly w.r.t. data updates, but I actually haven't
> >> mentioned
> >> > this thought to anyone until now ;)
> >> >
> >> > Finally, regarding your prior email:
> >> > Yes, I was thinking that the "wrong" output values might be part of
> >> > rolled-back transactions and therefore enabling read-committed mode on
> >> the
> >> > consumer might tell a different story that what you've seen to date.
> >> >
> >> > I'm honestly still baffled about those intermediate results that are
> >> > sneaking out. I wonder if it's something specific to your data stream,
> >> like
> >> > maybe if there is maybe an edge case when two records have exactly the
> >> same
> >> > timestamp? I'll have to stare at the code some more...
> >> >
> >> > Regardless, in order to reap the benefits of running the app with EOS,
> >> you
> >> > really have to also set your consumers to read_committed. Otherwise,
> >> you'll
> >> > be seeing output data from aborted (aka rolled-back) transactions, and
> >> you
> >> > miss the intended "exactly once" guarantee.
> >> >
> >> > Thanks,
> >> > -John
> >> >
> >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com>
> >> > wrote:
> >> >
> >> >> Hi John,
> >> >>
> >> >> Haven't been able to reinstate the demo yet, but I have been
> re-reading
> >> >> the following scenario of yours....
> >> >>
> >> >> On 1/24/19 11:48 PM, Peter Levart wrote:
> >> >> > Hi John,
> >> >> >
> >> >> > On 1/24/19 3:18 PM, John Roesler wrote:
> >> >> >
> >> >> >>
> >> >> >> The reason is that, upon restart, the suppression buffer can only
> >> >> >> "remember" what got sent & committed to its changelog topic
> before.
> >> >> >>
> >> >> >> The scenario I have in mind is:
> >> >> >>
> >> >> >> ...
> >> >> >> * buffer state X
> >> >> >> ...
> >> >> >> * flush state X to buffer changelog
> >> >> >> ...
> >> >> >> * commit transaction T0; start new transaction T1
> >> >> >> ...
> >> >> >> * emit final result X (in uncommitted transaction T1)
> >> >> >> ...
> >> >> >> * crash before flushing to the changelog the fact that state X was
> >> >> >> emitted.
> >> >> >> Also, transaction T1 gets aborted, since we crash before
> committing.
> >> >> >> ...
> >> >> >> * restart, restoring state X again from the changelog (because the
> >> emit
> >> >> >> didn't get committed)
> >> >> >> * start transaction T2
> >> >> >> * emit final result X again (in uncommitted transaction T2)
> >> >> >> ...
> >> >> >> * commit transaction T2
> >> >> >> ...
> >> >> >>
> >> >> >> So, the result gets emitted twice, but the first time is in an
> >> aborted
> >> >> >> transaction. This leads me to another clarifying question:
> >> >> >>
> >> >> >> Based on your first message, it seems like the duplicates you
> >> observe
> >> >> >> are
> >> >> >> in the output topic. When you read the topic, do you configure
> your
> >> >> >> consumer with "read committed" mode? If not, you'll see "results"
> >> from
> >> >> >> uncommitted transactions, which could explain the duplicates.
> >> >>
> >> >> ...and I was thinking that perhaps the right solution to the
> >> suppression
> >> >> problem would be to use transactional producers for the resulting
> >> output
> >> >> topic AND the store change-log. Is this possible? Does the compaction
> >> of
> >> >> the log on the brokers work for transactional producers as expected?
> In
> >> >> that case, the sending of final result and the marking of that fact
> in
> >> >> the store change log would together be an atomic operation.
> >> >> That said, I think there's another problem with suppression which
> looks
> >> >> like the supression processor is already processing the input while
> the
> >> >> state store has not been fully restored yet or something related...
> Is
> >> >> this guaranteed not to happen?
> >> >>
> >> >> And now something unrelated I wanted to ask...
> >> >>
> >> >> I'm trying to create my own custom state store. From the API I can
> see
> >> >> it is pretty straightforward. One thing that I don't quite understand
> >> is
> >> >> how Kafka Streams know whether to replay the whole change log after
> the
> >> >> store registers itself or just a part of it and which part (from
> which
> >> >> offset per partition). There doesn't seem to be any API point through
> >> >> which the store could communicate this information back to Kafka
> >> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
> >> >> Streams first invoke flush() on the store and then notes down the
> >> >> offsets from the change log producer somewhere? So next time the
> store
> >> >> is brought up, the log is only replayed from last noted down offset?
> So
> >> >> it can happen that the store gets some log entries that have already
> >> >> been incorporated in it (from the point of one flush before) but
> never
> >> >> misses any... In any case there has to be an indication somewhere
> that
> >> >> the store didn't survive and has to be rebuilt from scratch. How do
> >> >> Kafka Streams detect that situation? By placing some marker file into
> >> >> the directory reserved for store's local storage?
> >> >>
> >> >> Regards, Peter
> >> >>
> >> >>
> >>
> >
> >
> > --
> > Santilli Jonathan
> >
>
>
> --
> Santilli Jonathan
>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by Jonathan Santilli <jo...@gmail.com>.
BTW, after stopping the app gracefully (Stream#close()), this error shows
up repeatedly:
2019-03-01 17:18:07,819 WARN
[XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
[0_0] Failed to write offset checkpoint file to
[/tmp/kafka-stream/XXX/0_0/.checkpoint]
java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
(No such file or directory)
at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]
at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[?:1.8.0_191]
at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[?:1.8.0_191]
at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.close(
StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(
StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
However, I have checked and the folder created starts with: *1_*
ls -lha /tmp/kafka-stream/XXX/1_1
total 8
drwxr-xr-x 5 a b 160B 1 Mar 17:18 .
drwxr-xr-x 34 a b 1.1K 1 Mar 17:15 ..
-rw-r--r-- 1 a b 2.9K 1 Mar 17:18 .checkpoint
-rw-r--r-- 1 a b 0B 1 Mar 16:05 .lock
drwxr-xr-x 3 a b 96B 1 Mar 16:43
KSTREAM-REDUCE-STATE-STORE-0000000005
Cheers!
--
Jonathan
On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <jo...@gmail.com>
wrote:
> Hello John, hope you are well.
> I have tested the version 2.2 release candidate (although I know it has
> been postponed).
> I have been following this email thread because I think am experiencing
> the same issue. I have reported in an email to this list and also all the
> details are in OS (
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> ).
>
> After the test, the result is the same as before (at least for my case),
> already processed records are passed again to the output topic causing the
> data duplication:
>
> ...
> 2019-03-01 16:55:23,808 INFO [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No
> checkpoint found for task 1_10 state store
> KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS turned on. *Reinitializing
> the task and restore its state from the beginning.*
>
> ...
>
>
> I was hoping for this to be fixed, but is not the case, at least for my
> case.
>
> If you can, please take a look at the question in SO, I was in contact
> with Matthias about it, he also points me the place where probably the
> potential but could be present.
>
> Please, let me know any thoughts.
>
>
> Cheers!
> --
> Jonathan
>
>
> On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:
>
>> Hi again, Peter,
>>
>> Just to close the loop about the bug in Suppress, we did get the
>> (apparent)
>> same report from a few other people:
>> https://issues.apache.org/jira/browse/KAFKA-7895
>>
>> I also managed to reproduce the duplicate-result behavior, which could
>> cause it to emit both intermediate results and duplicate final results.
>>
>> There's a patch for it in the 2.2 release candidate. Perhaps you can try
>> it
>> out and see if it resolves the issue for you?
>>
>> I'm backporting the fix to 2.1 as well, but I unfortunately missed the
>> last
>> 2.1 bugfix release.
>>
>> Thanks,
>> -John
>>
>> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io> wrote:
>>
>> > Hi Peter,
>> >
>> > Thanks for the replies.
>> >
>> > Regarding transactions:
>> > Yes, actually, with EOS enabled, the changelog and the output topics are
>> > all produced with the same transactional producer, within the same
>> > transactions. So it should already be atomic.
>> >
>> > Regarding restore:
>> > Streams doesn't put the store into service until the restore is
>> completed,
>> > so it should be guaranteed not to happen. But there's of course no
>> > guarantee that I didn't mess something up. I'll take a hard look at it.
>> >
>> > Regarding restoration and offsets:
>> > Your guess is correct: Streams tracks the latest stored offset outside
>> of
>> > the store implementation itself, specifically by writing a file (called
>> a
>> > Checkpoint File) in the state directory. If the file is there, it reads
>> > that offset and restores from that point. If the file is missing, it
>> > restores from the beginning of the stream. So it should "just work" for
>> > you. Just for completeness, there have been several edge cases
>> discovered
>> > where this mechanism isn't completely safe, so in the case of EOS, I
>> > believe we actually disregard that checkpoint file and the prior state
>> and
>> > always rebuild from the earliest offset in the changelog.
>> >
>> > Personally, I would like to see us provide the ability to store the
>> > checkpoint inside the state store, so that checkpoint updates are
>> > linearized correctly w.r.t. data updates, but I actually haven't
>> mentioned
>> > this thought to anyone until now ;)
>> >
>> > Finally, regarding your prior email:
>> > Yes, I was thinking that the "wrong" output values might be part of
>> > rolled-back transactions and therefore enabling read-committed mode on
>> the
>> > consumer might tell a different story that what you've seen to date.
>> >
>> > I'm honestly still baffled about those intermediate results that are
>> > sneaking out. I wonder if it's something specific to your data stream,
>> like
>> > maybe if there is maybe an edge case when two records have exactly the
>> same
>> > timestamp? I'll have to stare at the code some more...
>> >
>> > Regardless, in order to reap the benefits of running the app with EOS,
>> you
>> > really have to also set your consumers to read_committed. Otherwise,
>> you'll
>> > be seeing output data from aborted (aka rolled-back) transactions, and
>> you
>> > miss the intended "exactly once" guarantee.
>> >
>> > Thanks,
>> > -John
>> >
>> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com>
>> > wrote:
>> >
>> >> Hi John,
>> >>
>> >> Haven't been able to reinstate the demo yet, but I have been re-reading
>> >> the following scenario of yours....
>> >>
>> >> On 1/24/19 11:48 PM, Peter Levart wrote:
>> >> > Hi John,
>> >> >
>> >> > On 1/24/19 3:18 PM, John Roesler wrote:
>> >> >
>> >> >>
>> >> >> The reason is that, upon restart, the suppression buffer can only
>> >> >> "remember" what got sent & committed to its changelog topic before.
>> >> >>
>> >> >> The scenario I have in mind is:
>> >> >>
>> >> >> ...
>> >> >> * buffer state X
>> >> >> ...
>> >> >> * flush state X to buffer changelog
>> >> >> ...
>> >> >> * commit transaction T0; start new transaction T1
>> >> >> ...
>> >> >> * emit final result X (in uncommitted transaction T1)
>> >> >> ...
>> >> >> * crash before flushing to the changelog the fact that state X was
>> >> >> emitted.
>> >> >> Also, transaction T1 gets aborted, since we crash before committing.
>> >> >> ...
>> >> >> * restart, restoring state X again from the changelog (because the
>> emit
>> >> >> didn't get committed)
>> >> >> * start transaction T2
>> >> >> * emit final result X again (in uncommitted transaction T2)
>> >> >> ...
>> >> >> * commit transaction T2
>> >> >> ...
>> >> >>
>> >> >> So, the result gets emitted twice, but the first time is in an
>> aborted
>> >> >> transaction. This leads me to another clarifying question:
>> >> >>
>> >> >> Based on your first message, it seems like the duplicates you
>> observe
>> >> >> are
>> >> >> in the output topic. When you read the topic, do you configure your
>> >> >> consumer with "read committed" mode? If not, you'll see "results"
>> from
>> >> >> uncommitted transactions, which could explain the duplicates.
>> >>
>> >> ...and I was thinking that perhaps the right solution to the
>> suppression
>> >> problem would be to use transactional producers for the resulting
>> output
>> >> topic AND the store change-log. Is this possible? Does the compaction
>> of
>> >> the log on the brokers work for transactional producers as expected? In
>> >> that case, the sending of final result and the marking of that fact in
>> >> the store change log would together be an atomic operation.
>> >> That said, I think there's another problem with suppression which looks
>> >> like the supression processor is already processing the input while the
>> >> state store has not been fully restored yet or something related... Is
>> >> this guaranteed not to happen?
>> >>
>> >> And now something unrelated I wanted to ask...
>> >>
>> >> I'm trying to create my own custom state store. From the API I can see
>> >> it is pretty straightforward. One thing that I don't quite understand
>> is
>> >> how Kafka Streams know whether to replay the whole change log after the
>> >> store registers itself or just a part of it and which part (from which
>> >> offset per partition). There doesn't seem to be any API point through
>> >> which the store could communicate this information back to Kafka
>> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
>> >> Streams first invoke flush() on the store and then notes down the
>> >> offsets from the change log producer somewhere? So next time the store
>> >> is brought up, the log is only replayed from last noted down offset? So
>> >> it can happen that the store gets some log entries that have already
>> >> been incorporated in it (from the point of one flush before) but never
>> >> misses any... In any case there has to be an indication somewhere that
>> >> the store didn't survive and has to be rebuilt from scratch. How do
>> >> Kafka Streams detect that situation? By placing some marker file into
>> >> the directory reserved for store's local storage?
>> >>
>> >> Regards, Peter
>> >>
>> >>
>>
>
>
> --
> Santilli Jonathan
>
--
Santilli Jonathan
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by Jonathan Santilli <jo...@gmail.com>.
Hello John, hope you are well.
I have tested the version 2.2 release candidate (although I know it has
been postponed).
I have been following this email thread because I think am experiencing the
same issue. I have reported in an email to this list and also all the
details are in OS (
https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
).
After the test, the result is the same as before (at least for my case),
already processed records are passed again to the output topic causing the
data duplication:
...
2019-03-01 16:55:23,808 INFO
[XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
internals.StoreChangelogReader (StoreChangelogReader.java:221) -
stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No
checkpoint found for task 1_10 state store
KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS
turned on. *Reinitializing
the task and restore its state from the beginning.*
...
I was hoping for this to be fixed, but is not the case, at least for my
case.
If you can, please take a look at the question in SO, I was in contact with
Matthias about it, he also points me the place where probably the
potential but could be present.
Please, let me know any thoughts.
Cheers!
--
Jonathan
On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:
> Hi again, Peter,
>
> Just to close the loop about the bug in Suppress, we did get the (apparent)
> same report from a few other people:
> https://issues.apache.org/jira/browse/KAFKA-7895
>
> I also managed to reproduce the duplicate-result behavior, which could
> cause it to emit both intermediate results and duplicate final results.
>
> There's a patch for it in the 2.2 release candidate. Perhaps you can try it
> out and see if it resolves the issue for you?
>
> I'm backporting the fix to 2.1 as well, but I unfortunately missed the last
> 2.1 bugfix release.
>
> Thanks,
> -John
>
> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io> wrote:
>
> > Hi Peter,
> >
> > Thanks for the replies.
> >
> > Regarding transactions:
> > Yes, actually, with EOS enabled, the changelog and the output topics are
> > all produced with the same transactional producer, within the same
> > transactions. So it should already be atomic.
> >
> > Regarding restore:
> > Streams doesn't put the store into service until the restore is
> completed,
> > so it should be guaranteed not to happen. But there's of course no
> > guarantee that I didn't mess something up. I'll take a hard look at it.
> >
> > Regarding restoration and offsets:
> > Your guess is correct: Streams tracks the latest stored offset outside of
> > the store implementation itself, specifically by writing a file (called a
> > Checkpoint File) in the state directory. If the file is there, it reads
> > that offset and restores from that point. If the file is missing, it
> > restores from the beginning of the stream. So it should "just work" for
> > you. Just for completeness, there have been several edge cases discovered
> > where this mechanism isn't completely safe, so in the case of EOS, I
> > believe we actually disregard that checkpoint file and the prior state
> and
> > always rebuild from the earliest offset in the changelog.
> >
> > Personally, I would like to see us provide the ability to store the
> > checkpoint inside the state store, so that checkpoint updates are
> > linearized correctly w.r.t. data updates, but I actually haven't
> mentioned
> > this thought to anyone until now ;)
> >
> > Finally, regarding your prior email:
> > Yes, I was thinking that the "wrong" output values might be part of
> > rolled-back transactions and therefore enabling read-committed mode on
> the
> > consumer might tell a different story that what you've seen to date.
> >
> > I'm honestly still baffled about those intermediate results that are
> > sneaking out. I wonder if it's something specific to your data stream,
> like
> > maybe if there is maybe an edge case when two records have exactly the
> same
> > timestamp? I'll have to stare at the code some more...
> >
> > Regardless, in order to reap the benefits of running the app with EOS,
> you
> > really have to also set your consumers to read_committed. Otherwise,
> you'll
> > be seeing output data from aborted (aka rolled-back) transactions, and
> you
> > miss the intended "exactly once" guarantee.
> >
> > Thanks,
> > -John
> >
> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com>
> > wrote:
> >
> >> Hi John,
> >>
> >> Haven't been able to reinstate the demo yet, but I have been re-reading
> >> the following scenario of yours....
> >>
> >> On 1/24/19 11:48 PM, Peter Levart wrote:
> >> > Hi John,
> >> >
> >> > On 1/24/19 3:18 PM, John Roesler wrote:
> >> >
> >> >>
> >> >> The reason is that, upon restart, the suppression buffer can only
> >> >> "remember" what got sent & committed to its changelog topic before.
> >> >>
> >> >> The scenario I have in mind is:
> >> >>
> >> >> ...
> >> >> * buffer state X
> >> >> ...
> >> >> * flush state X to buffer changelog
> >> >> ...
> >> >> * commit transaction T0; start new transaction T1
> >> >> ...
> >> >> * emit final result X (in uncommitted transaction T1)
> >> >> ...
> >> >> * crash before flushing to the changelog the fact that state X was
> >> >> emitted.
> >> >> Also, transaction T1 gets aborted, since we crash before committing.
> >> >> ...
> >> >> * restart, restoring state X again from the changelog (because the
> emit
> >> >> didn't get committed)
> >> >> * start transaction T2
> >> >> * emit final result X again (in uncommitted transaction T2)
> >> >> ...
> >> >> * commit transaction T2
> >> >> ...
> >> >>
> >> >> So, the result gets emitted twice, but the first time is in an
> aborted
> >> >> transaction. This leads me to another clarifying question:
> >> >>
> >> >> Based on your first message, it seems like the duplicates you observe
> >> >> are
> >> >> in the output topic. When you read the topic, do you configure your
> >> >> consumer with "read committed" mode? If not, you'll see "results"
> from
> >> >> uncommitted transactions, which could explain the duplicates.
> >>
> >> ...and I was thinking that perhaps the right solution to the suppression
> >> problem would be to use transactional producers for the resulting output
> >> topic AND the store change-log. Is this possible? Does the compaction of
> >> the log on the brokers work for transactional producers as expected? In
> >> that case, the sending of final result and the marking of that fact in
> >> the store change log would together be an atomic operation.
> >> That said, I think there's another problem with suppression which looks
> >> like the supression processor is already processing the input while the
> >> state store has not been fully restored yet or something related... Is
> >> this guaranteed not to happen?
> >>
> >> And now something unrelated I wanted to ask...
> >>
> >> I'm trying to create my own custom state store. From the API I can see
> >> it is pretty straightforward. One thing that I don't quite understand is
> >> how Kafka Streams know whether to replay the whole change log after the
> >> store registers itself or just a part of it and which part (from which
> >> offset per partition). There doesn't seem to be any API point through
> >> which the store could communicate this information back to Kafka
> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
> >> Streams first invoke flush() on the store and then notes down the
> >> offsets from the change log producer somewhere? So next time the store
> >> is brought up, the log is only replayed from last noted down offset? So
> >> it can happen that the store gets some log entries that have already
> >> been incorporated in it (from the point of one flush before) but never
> >> misses any... In any case there has to be an indication somewhere that
> >> the store didn't survive and has to be rebuilt from scratch. How do
> >> Kafka Streams detect that situation? By placing some marker file into
> >> the directory reserved for store's local storage?
> >>
> >> Regards, Peter
> >>
> >>
>
--
Santilli Jonathan
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by John Roesler <jo...@confluent.io>.
Hi again, Peter,
Just to close the loop about the bug in Suppress, we did get the (apparent)
same report from a few other people:
https://issues.apache.org/jira/browse/KAFKA-7895
I also managed to reproduce the duplicate-result behavior, which could
cause it to emit both intermediate results and duplicate final results.
There's a patch for it in the 2.2 release candidate. Perhaps you can try it
out and see if it resolves the issue for you?
I'm backporting the fix to 2.1 as well, but I unfortunately missed the last
2.1 bugfix release.
Thanks,
-John
On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io> wrote:
> Hi Peter,
>
> Thanks for the replies.
>
> Regarding transactions:
> Yes, actually, with EOS enabled, the changelog and the output topics are
> all produced with the same transactional producer, within the same
> transactions. So it should already be atomic.
>
> Regarding restore:
> Streams doesn't put the store into service until the restore is completed,
> so it should be guaranteed not to happen. But there's of course no
> guarantee that I didn't mess something up. I'll take a hard look at it.
>
> Regarding restoration and offsets:
> Your guess is correct: Streams tracks the latest stored offset outside of
> the store implementation itself, specifically by writing a file (called a
> Checkpoint File) in the state directory. If the file is there, it reads
> that offset and restores from that point. If the file is missing, it
> restores from the beginning of the stream. So it should "just work" for
> you. Just for completeness, there have been several edge cases discovered
> where this mechanism isn't completely safe, so in the case of EOS, I
> believe we actually disregard that checkpoint file and the prior state and
> always rebuild from the earliest offset in the changelog.
>
> Personally, I would like to see us provide the ability to store the
> checkpoint inside the state store, so that checkpoint updates are
> linearized correctly w.r.t. data updates, but I actually haven't mentioned
> this thought to anyone until now ;)
>
> Finally, regarding your prior email:
> Yes, I was thinking that the "wrong" output values might be part of
> rolled-back transactions and therefore enabling read-committed mode on the
> consumer might tell a different story that what you've seen to date.
>
> I'm honestly still baffled about those intermediate results that are
> sneaking out. I wonder if it's something specific to your data stream, like
> maybe if there is maybe an edge case when two records have exactly the same
> timestamp? I'll have to stare at the code some more...
>
> Regardless, in order to reap the benefits of running the app with EOS, you
> really have to also set your consumers to read_committed. Otherwise, you'll
> be seeing output data from aborted (aka rolled-back) transactions, and you
> miss the intended "exactly once" guarantee.
>
> Thanks,
> -John
>
> On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com>
> wrote:
>
>> Hi John,
>>
>> Haven't been able to reinstate the demo yet, but I have been re-reading
>> the following scenario of yours....
>>
>> On 1/24/19 11:48 PM, Peter Levart wrote:
>> > Hi John,
>> >
>> > On 1/24/19 3:18 PM, John Roesler wrote:
>> >
>> >>
>> >> The reason is that, upon restart, the suppression buffer can only
>> >> "remember" what got sent & committed to its changelog topic before.
>> >>
>> >> The scenario I have in mind is:
>> >>
>> >> ...
>> >> * buffer state X
>> >> ...
>> >> * flush state X to buffer changelog
>> >> ...
>> >> * commit transaction T0; start new transaction T1
>> >> ...
>> >> * emit final result X (in uncommitted transaction T1)
>> >> ...
>> >> * crash before flushing to the changelog the fact that state X was
>> >> emitted.
>> >> Also, transaction T1 gets aborted, since we crash before committing.
>> >> ...
>> >> * restart, restoring state X again from the changelog (because the emit
>> >> didn't get committed)
>> >> * start transaction T2
>> >> * emit final result X again (in uncommitted transaction T2)
>> >> ...
>> >> * commit transaction T2
>> >> ...
>> >>
>> >> So, the result gets emitted twice, but the first time is in an aborted
>> >> transaction. This leads me to another clarifying question:
>> >>
>> >> Based on your first message, it seems like the duplicates you observe
>> >> are
>> >> in the output topic. When you read the topic, do you configure your
>> >> consumer with "read committed" mode? If not, you'll see "results" from
>> >> uncommitted transactions, which could explain the duplicates.
>>
>> ...and I was thinking that perhaps the right solution to the suppression
>> problem would be to use transactional producers for the resulting output
>> topic AND the store change-log. Is this possible? Does the compaction of
>> the log on the brokers work for transactional producers as expected? In
>> that case, the sending of final result and the marking of that fact in
>> the store change log would together be an atomic operation.
>> That said, I think there's another problem with suppression which looks
>> like the supression processor is already processing the input while the
>> state store has not been fully restored yet or something related... Is
>> this guaranteed not to happen?
>>
>> And now something unrelated I wanted to ask...
>>
>> I'm trying to create my own custom state store. From the API I can see
>> it is pretty straightforward. One thing that I don't quite understand is
>> how Kafka Streams know whether to replay the whole change log after the
>> store registers itself or just a part of it and which part (from which
>> offset per partition). There doesn't seem to be any API point through
>> which the store could communicate this information back to Kafka
>> Streams. Is such bookkeeping performed outside the store? Does Kafka
>> Streams first invoke flush() on the store and then notes down the
>> offsets from the change log producer somewhere? So next time the store
>> is brought up, the log is only replayed from last noted down offset? So
>> it can happen that the store gets some log entries that have already
>> been incorporated in it (from the point of one flush before) but never
>> misses any... In any case there has to be an indication somewhere that
>> the store didn't survive and has to be rebuilt from scratch. How do
>> Kafka Streams detect that situation? By placing some marker file into
>> the directory reserved for store's local storage?
>>
>> Regards, Peter
>>
>>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by John Roesler <jo...@confluent.io>.
Hi Peter,
Thanks for the replies.
Regarding transactions:
Yes, actually, with EOS enabled, the changelog and the output topics are
all produced with the same transactional producer, within the same
transactions. So it should already be atomic.
Regarding restore:
Streams doesn't put the store into service until the restore is completed,
so it should be guaranteed not to happen. But there's of course no
guarantee that I didn't mess something up. I'll take a hard look at it.
Regarding restoration and offsets:
Your guess is correct: Streams tracks the latest stored offset outside of
the store implementation itself, specifically by writing a file (called a
Checkpoint File) in the state directory. If the file is there, it reads
that offset and restores from that point. If the file is missing, it
restores from the beginning of the stream. So it should "just work" for
you. Just for completeness, there have been several edge cases discovered
where this mechanism isn't completely safe, so in the case of EOS, I
believe we actually disregard that checkpoint file and the prior state and
always rebuild from the earliest offset in the changelog.
Personally, I would like to see us provide the ability to store the
checkpoint inside the state store, so that checkpoint updates are
linearized correctly w.r.t. data updates, but I actually haven't mentioned
this thought to anyone until now ;)
Finally, regarding your prior email:
Yes, I was thinking that the "wrong" output values might be part of
rolled-back transactions and therefore enabling read-committed mode on the
consumer might tell a different story that what you've seen to date.
I'm honestly still baffled about those intermediate results that are
sneaking out. I wonder if it's something specific to your data stream, like
maybe if there is maybe an edge case when two records have exactly the same
timestamp? I'll have to stare at the code some more...
Regardless, in order to reap the benefits of running the app with EOS, you
really have to also set your consumers to read_committed. Otherwise, you'll
be seeing output data from aborted (aka rolled-back) transactions, and you
miss the intended "exactly once" guarantee.
Thanks,
-John
On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com> wrote:
> Hi John,
>
> Haven't been able to reinstate the demo yet, but I have been re-reading
> the following scenario of yours....
>
> On 1/24/19 11:48 PM, Peter Levart wrote:
> > Hi John,
> >
> > On 1/24/19 3:18 PM, John Roesler wrote:
> >
> >>
> >> The reason is that, upon restart, the suppression buffer can only
> >> "remember" what got sent & committed to its changelog topic before.
> >>
> >> The scenario I have in mind is:
> >>
> >> ...
> >> * buffer state X
> >> ...
> >> * flush state X to buffer changelog
> >> ...
> >> * commit transaction T0; start new transaction T1
> >> ...
> >> * emit final result X (in uncommitted transaction T1)
> >> ...
> >> * crash before flushing to the changelog the fact that state X was
> >> emitted.
> >> Also, transaction T1 gets aborted, since we crash before committing.
> >> ...
> >> * restart, restoring state X again from the changelog (because the emit
> >> didn't get committed)
> >> * start transaction T2
> >> * emit final result X again (in uncommitted transaction T2)
> >> ...
> >> * commit transaction T2
> >> ...
> >>
> >> So, the result gets emitted twice, but the first time is in an aborted
> >> transaction. This leads me to another clarifying question:
> >>
> >> Based on your first message, it seems like the duplicates you observe
> >> are
> >> in the output topic. When you read the topic, do you configure your
> >> consumer with "read committed" mode? If not, you'll see "results" from
> >> uncommitted transactions, which could explain the duplicates.
>
> ...and I was thinking that perhaps the right solution to the suppression
> problem would be to use transactional producers for the resulting output
> topic AND the store change-log. Is this possible? Does the compaction of
> the log on the brokers work for transactional producers as expected? In
> that case, the sending of final result and the marking of that fact in
> the store change log would together be an atomic operation.
> That said, I think there's another problem with suppression which looks
> like the supression processor is already processing the input while the
> state store has not been fully restored yet or something related... Is
> this guaranteed not to happen?
>
> And now something unrelated I wanted to ask...
>
> I'm trying to create my own custom state store. From the API I can see
> it is pretty straightforward. One thing that I don't quite understand is
> how Kafka Streams know whether to replay the whole change log after the
> store registers itself or just a part of it and which part (from which
> offset per partition). There doesn't seem to be any API point through
> which the store could communicate this information back to Kafka
> Streams. Is such bookkeeping performed outside the store? Does Kafka
> Streams first invoke flush() on the store and then notes down the
> offsets from the change log producer somewhere? So next time the store
> is brought up, the log is only replayed from last noted down offset? So
> it can happen that the store gets some log entries that have already
> been incorporated in it (from the point of one flush before) but never
> misses any... In any case there has to be an indication somewhere that
> the store didn't survive and has to be rebuilt from scratch. How do
> Kafka Streams detect that situation? By placing some marker file into
> the directory reserved for store's local storage?
>
> Regards, Peter
>
>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by Peter Levart <pe...@gmail.com>.
Hi John,
Haven't been able to reinstate the demo yet, but I have been re-reading
the following scenario of yours....
On 1/24/19 11:48 PM, Peter Levart wrote:
> Hi John,
>
> On 1/24/19 3:18 PM, John Roesler wrote:
>
>>
>> The reason is that, upon restart, the suppression buffer can only
>> "remember" what got sent & committed to its changelog topic before.
>>
>> The scenario I have in mind is:
>>
>> ...
>> * buffer state X
>> ...
>> * flush state X to buffer changelog
>> ...
>> * commit transaction T0; start new transaction T1
>> ...
>> * emit final result X (in uncommitted transaction T1)
>> ...
>> * crash before flushing to the changelog the fact that state X was
>> emitted.
>> Also, transaction T1 gets aborted, since we crash before committing.
>> ...
>> * restart, restoring state X again from the changelog (because the emit
>> didn't get committed)
>> * start transaction T2
>> * emit final result X again (in uncommitted transaction T2)
>> ...
>> * commit transaction T2
>> ...
>>
>> So, the result gets emitted twice, but the first time is in an aborted
>> transaction. This leads me to another clarifying question:
>>
>> Based on your first message, it seems like the duplicates you observe
>> are
>> in the output topic. When you read the topic, do you configure your
>> consumer with "read committed" mode? If not, you'll see "results" from
>> uncommitted transactions, which could explain the duplicates.
...and I was thinking that perhaps the right solution to the suppression
problem would be to use transactional producers for the resulting output
topic AND the store change-log. Is this possible? Does the compaction of
the log on the brokers work for transactional producers as expected? In
that case, the sending of final result and the marking of that fact in
the store change log would together be an atomic operation.
That said, I think there's another problem with suppression which looks
like the supression processor is already processing the input while the
state store has not been fully restored yet or something related... Is
this guaranteed not to happen?
And now something unrelated I wanted to ask...
I'm trying to create my own custom state store. From the API I can see
it is pretty straightforward. One thing that I don't quite understand is
how Kafka Streams know whether to replay the whole change log after the
store registers itself or just a part of it and which part (from which
offset per partition). There doesn't seem to be any API point through
which the store could communicate this information back to Kafka
Streams. Is such bookkeeping performed outside the store? Does Kafka
Streams first invoke flush() on the store and then notes down the
offsets from the change log producer somewhere? So next time the store
is brought up, the log is only replayed from last noted down offset? So
it can happen that the store gets some log entries that have already
been incorporated in it (from the point of one flush before) but never
misses any... In any case there has to be an indication somewhere that
the store didn't survive and has to be rebuilt from scratch. How do
Kafka Streams detect that situation? By placing some marker file into
the directory reserved for store's local storage?
Regards, Peter
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by Peter Levart <pe...@gmail.com>.
Hi John,
On 1/24/19 3:18 PM, John Roesler wrote:
> Hi Peter,
>
> Thanks for the clarification.
>
> When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't
> think that Streams automatically registers a shutdown hook. In our examples
> and demos, we register a shutdown hook "outside" of streams (right next to
> the code that calls start() ).
> Unless I missed something, a SIGTERM would still cause Streams to exit
> abruptly, skipping flush and commit. This can cause apparent duplicates *if
> you're not using EOS or if you're reading uncommitted transactions*.
The fact is that Spring which I use to instantiate the KafkaStreams
object does that:
   @Bean(initMethod = "start", destroyMethod = "close")
   public KafkaStreams processorStreams(...
..so when JVM gets SIGTERM, the shutdown hook that Spring installs shuts
down the ApplicationContext which calls all "destroyMethod"(s) on
registered Bean(s)...
And the duplicates are less apparent but still occur even in EOS mode...
But they are not actual duplicates. They are duplicate(s) only by
windowed keys, the values are different...
>
> The reason is that, upon restart, the suppression buffer can only
> "remember" what got sent & committed to its changelog topic before.
>
> The scenario I have in mind is:
>
> ...
> * buffer state X
> ...
> * flush state X to buffer changelog
> ...
> * commit transaction T0; start new transaction T1
> ...
> * emit final result X (in uncommitted transaction T1)
> ...
> * crash before flushing to the changelog the fact that state X was emitted.
> Also, transaction T1 gets aborted, since we crash before committing.
> ...
> * restart, restoring state X again from the changelog (because the emit
> didn't get committed)
> * start transaction T2
> * emit final result X again (in uncommitted transaction T2)
> ...
> * commit transaction T2
> ...
>
> So, the result gets emitted twice, but the first time is in an aborted
> transaction. This leads me to another clarifying question:
>
> Based on your first message, it seems like the duplicates you observe are
> in the output topic. When you read the topic, do you configure your
> consumer with "read committed" mode? If not, you'll see "results" from
> uncommitted transactions, which could explain the duplicates.
So when EOS is enabled, the output topics are used in transactional
manner. The consumer of such topic should enable read_commited semantics
then...
That would do if my problem was about seeing duplicates of final
windowing results. That is not my problem. My problem is that upon
restart of processor, I see some non-final window aggregations, followed
by final aggregations for the same windowed key. That's harder to
tolerate in an application. If it was just duplicates of the "correct"
aggregation I could ignore the 2nd and subsequent message for the same
windowed key, but if I 1st get a non-final aggregation, I can not simply
ignore the 2nd occurence of the same windowed key. I must cope with
"replacing the previous aggregation with new version of it" in the app.
Meaning, that suppression of non-final results does not buy me anything
as it is not guaranteeing that.
Is it possible that non-final windowed aggregations are emitted in some
scenario, but then such transaction is rolled-back and I would not see
the non-fnal aggregations if I enabled read commited isolation on consumer?
I think I'll have to reinstate the demo and try that...
Stay tuned.
Regards, Peter
>
> Likewise, if you were to attach a callback, like "foreach" downstream of
> the suppression, you would see duplicates in the case of a crash. Callbacks
> are a general "hole" in EOS, which I have some ideas to close, but that's a
> separate topic.
>
> There may still be something else going on, but I'm trying to start with
> the simpler explanations.
>
> Thanks again,
> -John
>
> Thanks,
> -John
>
> On Wed, Jan 23, 2019 at 5:11 AM Peter Levart <pe...@gmail.com> wrote:
>
>> Hi John,
>>
>> Sorry I haven't had time to prepare the minimal reproducer yet. I still
>> have plans to do it though...
>>
>> On 1/22/19 8:02 PM, John Roesler wrote:
>>> Hi Peter,
>>>
>>> Just to follow up on the actual bug, can you confirm whether:
>>> * when you say "restart", do you mean orderly shutdown and restart, or
>>> crash and restart?
>> I start it as SpringBoot application from IDEA and then stop it with the
>> red square button. It does initiate the shutdown sequence before
>> exiting... So I think it is by SIGTERM which initiates JVM shutdown
>> hook(s).
>>
>>> * have you tried this with EOS enabled? I can imagine some ways that
>> there
>>> could be duplicates, but they should be impossible with EOS enabled.
>> Yes, I have EOS enabled.
>>
>>> Thanks for your help,
>>> -John
>> Regards, Peter
>>
>>> On Mon, Jan 14, 2019 at 1:20 PM John Roesler <jo...@confluent.io> wrote:
>>>
>>>> Hi Peter,
>>>>
>>>> I see your train of thought, but the actual implementation of the
>>>> window store is structured differently from your mental model.
>>>> Unlike Key/Value stores, we know that the records in a window
>>>> store will "expire" on a regular schedule, and also that every single
>>>> record will eventually expire. With this in mind, we have implemented
>>>> an optimization to avoid a lot of compaction overhead in RocksDB, as
>>>> well as saving on range scans.
>>>>
>>>> Instead of storing everything in one database, we open several
>>>> databases and bucket windows into them. Then, when windows
>>>> expire, we just ignore the records (i.e., the API makes them
>> unreachable,
>>>> but we don't actually delete them). Once all the windows in a database
>>>> are expired, we just close and delete the whole database. Then, we open
>>>> a new one for new windows. If you look in the code, these databases are
>>>> called "segments".
>>>>
>>>> Thus, I don't think that you should attempt to use the built-in window
>>>> stores
>>>> as you described. Instead, it should be straightforward to implement
>> your
>>>> own StateStore with a layout that's more favorable to your desired
>>>> behavior.
>>>>
>>>> You should also be able to set up the change log the way you need as
>> well.
>>>> Explicitly removed entities also would get removed from the log as
>> well, if
>>>> it's a compacted log.
>>>>
>>>> Actually, what you're describing is *very* similar to the implementation
>>>> for suppress. I might actually suggest that you just copy the
>> suppression
>>>> implementation and adapt it to your needs, or at the very least, study
>>>> how it works. In doing so, you might actually discover the cause of the
>>>> bug yourself!
>>>>
>>>> I hope this helps, and thanks for your help,
>>>> -John
>>>>
>>>>
>>>> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <pe...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jonh,
>>>>>
>>>>> Thank you very much for explaining how WindowStore works. I have some
>>>>> more questions...
>>>>>
>>>>> On 1/10/19 5:33 PM, John Roesler wrote:
>>>>>> Hi Peter,
>>>>>>
>>>>>> Regarding retention, I was not referring to log retention, but to the
>>>>>> window store retention.
>>>>>> Since a new window is created every second (for example), there are in
>>>>>> principle an unbounded
>>>>>> number of windows (the longer the application runs, the more windows
>>>>> there
>>>>>> are, with no end).
>>>>>> However, we obviously can't store an infinite amount of data, so the
>>>>> window
>>>>>> definition includes
>>>>>> a retention period. By default, this is 24 hours. After the retention
>>>>>> period elapses, all of the data
>>>>>> for the window is purged to make room for new windows.
>>>>> Right. Would the following work for example:
>>>>>
>>>>> - configure retention of WindowStore to be "infinite"
>>>>> - explicitly remove records from the store when windows are flushed out
>>>>> - configure WindowStore log topic for compacting
>>>>>
>>>>> Something like the following:
>>>>>
>>>>> Stores
>>>>> .windowStoreBuilder(
>>>>> Stores.persistentWindowStore(
>>>>> storeName,
>>>>> Duration.of(1000L, ChronoUnit.YEARS), //
>>>>> retentionPeriod
>>>>> Duration.ofSeconds(10), // windowSize
>>>>> false
>>>>> ),
>>>>> keySerde, valSerde
>>>>> )
>>>>> .withCachingEnabled()
>>>>> .withLoggingEnabled(
>>>>> Map.of(
>>>>> TopicConfig.CLEANUP_POLICY_CONFIG,
>>>>> TopicConfig.CLEANUP_POLICY_COMPACT
>>>>> )
>>>>> );
>>>>>
>>>>> Would in above scenario:
>>>>>
>>>>> - the on-disk WindowStore be kept bounded (there could be some very old
>>>>> entries in it but majority will be new - depending on the activity of
>>>>> particular input keys)
>>>>> - the log topic be kept bounded (explicitly removed entries would be
>>>>> removed from compacted log too)
>>>>>
>>>>> I'm moving away from DSL partly because I have some problems with
>>>>> suppression (which I hope we'll be able to fix) and partly because the
>>>>> DSL can't give me the complicated semantics that I need for the
>>>>> application at hand. I tried to capture what I need in a custom
>>>>> Transformer here:
>>>>>
>>>>> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
>>>>>
>>>>> Your knowledge of how WindowStore works would greatly help me decide if
>>>>> this is a workable idea.
>>>>>
>>>>>> So what I meant was that if you buffer some key "A" in window (Monday
>>>>>> 09:00:00) and then get
>>>>>> no further activity for A for over 24 hours, then when you do get that
>>>>> next
>>>>>> event for A, say at
>>>>>> (Tuesday 11:00:00), you'd do the scan but find nothing, since your
>>>>> buffered
>>>>>> state would already
>>>>>> have been purged from the store.
>>>>> Right. That would be the case when WindowStore was configured with
>>>>> default retention of 24 hours. A quick question: What does window size
>>>>> configuration for WindowStore (see above) do? Does it have to be
>>>>> synchronized with the size of windows stored in it?
>>>>>
>>>>>> The way I avoided this problem for Suppression was to organize the
>> data
>>>>> by
>>>>>> timestamp instead
>>>>>> of by key, so on *every* update I can search for all the keys that are
>>>>> old
>>>>>> enough and emit them.
>>>>>> I also don't use a window store, so I don't have to worry about the
>>>>>> retention time.
>>>>>>
>>>>>> To answer your question about the window store's topic, it configures
>> a
>>>>>> retention time the same
>>>>>> length as the store's retention time, (and they keys are the full
>>>>> windowed
>>>>>> key including the window
>>>>>> start time), so it'll have roughly the same size bound as the store
>>>>> itself.
>>>>>
>>>>> Would explicitly removed entries from WindowStore be removed from log
>>>>> too if it was a compacting log?
>>>>>
>>>>>> Back to the process of figuring out what might be wrong with
>>>>> Suppression, I
>>>>>> don't suppose you
>>>>>> would be able to file a Jira and upload a repro program? If not,
>> that's
>>>>> ok.
>>>>>> I haven't been able to
>>>>>> reproduce the bug yet, but it seems like it's happening somewhat
>>>>>> consistently for you, so I should
>>>>>> be able to get it to happen eventually.
>>>>>>
>>>>>> Thanks, and sorry again for the troubles.
>>>>>> -John
>>>>> I can prepare a minimal reproducer. No problem...
>>>>>
>>>>> Regards, Peter
>>>>>
>>>>>> On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com>
>>>>> wrote:
>>>>>>> On 1/8/19 12:57 PM, Peter Levart wrote:
>>>>>>>> Hi John,
>>>>>>>>
>>>>>>>> On 1/8/19 12:45 PM, Peter Levart wrote:
>>>>>>>>>> I looked at your custom transfomer, and it looks almost correct to
>>>>>>>>>> me. The
>>>>>>>>>> only flaw seems to be that it only looks
>>>>>>>>>> for closed windows for the key currently being processed, which
>>>>>>>>>> means that
>>>>>>>>>> if you have key "A" buffered, but don't get another event for it
>>>>> for a
>>>>>>>>>> while after the window closes, you won't emit the final result.
>> This
>>>>>>>>>> might
>>>>>>>>>> actually take longer than the window retention period, in which
>>>>>>>>>> case, the
>>>>>>>>>> data would be deleted without ever emitting the final result.
>>>>>>>>> So in DSL case, the suppression works by flushing *all* of the
>> "ripe"
>>>>>>>>> windows in the whole buffer whenever a singe event comes in with
>>>>>>>>> recent enough timestamp regardless of the key of that event?
>>>>>>>>>
>>>>>>>>> Is the buffer shared among processing tasks or does each task
>>>>>>>>> maintain its own private buffer that only contains its share of
>> data
>>>>>>>>> pertaining to assigned input partitions? In case the tasks are
>>>>>>>>> executed on several processing JVM(s) the buffer can't really be
>>>>>>>>> shared, right? In that case a single event can't flush all of the
>>>>>>>>> "ripe" windows, but just those that are contained in the task's
>> part
>>>>>>>>> of buffer...
>>>>>>>> Just a question about your comment above:
>>>>>>>>
>>>>>>>> /"This might actually take longer than the window retention period,
>> in
>>>>>>>> which case, the data would be deleted without ever emitting the
>> final
>>>>>>>> result"/
>>>>>>>>
>>>>>>>> Are you talking about the buffer log topic retention? Aren't log
>>>>>>>> topics configured to "compact" rather than "delete" messages? So the
>>>>>>>> last "version" of the buffer entry for a particular key should stay
>>>>>>>> forever? What are the keys in suppression buffer log topic? Are
>> they a
>>>>>>>> pair of (timestamp, key) ? Probably not since in that case the
>>>>>>>> compacted log would grow indefinitely...
>>>>>>>>
>>>>>>>> Another question:
>>>>>>>>
>>>>>>>> What are the keys in WindowStore's log topic? If the input keys to
>> the
>>>>>>>> processor that uses such WindowStore consist of a bounded set of
>>>>>>>> values (for example user ids), would compacted log of such
>> WindowStore
>>>>>>>> also be bounded?
>>>>>>> In case the key of WindowStore log topic is (timestamp, key) then
>> would
>>>>>>> explicitly deleting flushed entries from WindowStore (by putting null
>>>>>>> value into the store) keep the compacted log bounded? In other words,
>>>>>>> does WindowStore log topic support a special kind of "tombstone"
>>>>> message
>>>>>>> that effectively removes the key from the compacted log?
>>>>>>>
>>>>>>> In that case, my custom processor could keep entries in its
>> WindowStore
>>>>>>> for as log as needed, depending on the activity of a particular input
>>>>>>> key...
>>>>>>>
>>>>>>>> Regards, Peter
>>>>>>>>
>>>>>>>>
>>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by John Roesler <jo...@confluent.io>.
Hi Peter,
Thanks for the clarification.
When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't
think that Streams automatically registers a shutdown hook. In our examples
and demos, we register a shutdown hook "outside" of streams (right next to
the code that calls start() ).
Unless I missed something, a SIGTERM would still cause Streams to exit
abruptly, skipping flush and commit. This can cause apparent duplicates *if
you're not using EOS or if you're reading uncommitted transactions*.
The reason is that, upon restart, the suppression buffer can only
"remember" what got sent & committed to its changelog topic before.
The scenario I have in mind is:
...
* buffer state X
...
* flush state X to buffer changelog
...
* commit transaction T0; start new transaction T1
...
* emit final result X (in uncommitted transaction T1)
...
* crash before flushing to the changelog the fact that state X was emitted.
Also, transaction T1 gets aborted, since we crash before committing.
...
* restart, restoring state X again from the changelog (because the emit
didn't get committed)
* start transaction T2
* emit final result X again (in uncommitted transaction T2)
...
* commit transaction T2
...
So, the result gets emitted twice, but the first time is in an aborted
transaction. This leads me to another clarifying question:
Based on your first message, it seems like the duplicates you observe are
in the output topic. When you read the topic, do you configure your
consumer with "read committed" mode? If not, you'll see "results" from
uncommitted transactions, which could explain the duplicates.
Likewise, if you were to attach a callback, like "foreach" downstream of
the suppression, you would see duplicates in the case of a crash. Callbacks
are a general "hole" in EOS, which I have some ideas to close, but that's a
separate topic.
There may still be something else going on, but I'm trying to start with
the simpler explanations.
Thanks again,
-John
Thanks,
-John
On Wed, Jan 23, 2019 at 5:11 AM Peter Levart <pe...@gmail.com> wrote:
> Hi John,
>
> Sorry I haven't had time to prepare the minimal reproducer yet. I still
> have plans to do it though...
>
> On 1/22/19 8:02 PM, John Roesler wrote:
> > Hi Peter,
> >
> > Just to follow up on the actual bug, can you confirm whether:
> > * when you say "restart", do you mean orderly shutdown and restart, or
> > crash and restart?
>
> I start it as SpringBoot application from IDEA and then stop it with the
> red square button. It does initiate the shutdown sequence before
> exiting... So I think it is by SIGTERM which initiates JVM shutdown
> hook(s).
>
> > * have you tried this with EOS enabled? I can imagine some ways that
> there
> > could be duplicates, but they should be impossible with EOS enabled.
>
> Yes, I have EOS enabled.
>
> >
> > Thanks for your help,
> > -John
>
> Regards, Peter
>
> >
> > On Mon, Jan 14, 2019 at 1:20 PM John Roesler <jo...@confluent.io> wrote:
> >
> >> Hi Peter,
> >>
> >> I see your train of thought, but the actual implementation of the
> >> window store is structured differently from your mental model.
> >> Unlike Key/Value stores, we know that the records in a window
> >> store will "expire" on a regular schedule, and also that every single
> >> record will eventually expire. With this in mind, we have implemented
> >> an optimization to avoid a lot of compaction overhead in RocksDB, as
> >> well as saving on range scans.
> >>
> >> Instead of storing everything in one database, we open several
> >> databases and bucket windows into them. Then, when windows
> >> expire, we just ignore the records (i.e., the API makes them
> unreachable,
> >> but we don't actually delete them). Once all the windows in a database
> >> are expired, we just close and delete the whole database. Then, we open
> >> a new one for new windows. If you look in the code, these databases are
> >> called "segments".
> >>
> >> Thus, I don't think that you should attempt to use the built-in window
> >> stores
> >> as you described. Instead, it should be straightforward to implement
> your
> >> own StateStore with a layout that's more favorable to your desired
> >> behavior.
> >>
> >> You should also be able to set up the change log the way you need as
> well.
> >> Explicitly removed entities also would get removed from the log as
> well, if
> >> it's a compacted log.
> >>
> >> Actually, what you're describing is *very* similar to the implementation
> >> for suppress. I might actually suggest that you just copy the
> suppression
> >> implementation and adapt it to your needs, or at the very least, study
> >> how it works. In doing so, you might actually discover the cause of the
> >> bug yourself!
> >>
> >> I hope this helps, and thanks for your help,
> >> -John
> >>
> >>
> >> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <pe...@gmail.com>
> >> wrote:
> >>
> >>> Hi Jonh,
> >>>
> >>> Thank you very much for explaining how WindowStore works. I have some
> >>> more questions...
> >>>
> >>> On 1/10/19 5:33 PM, John Roesler wrote:
> >>>> Hi Peter,
> >>>>
> >>>> Regarding retention, I was not referring to log retention, but to the
> >>>> window store retention.
> >>>> Since a new window is created every second (for example), there are in
> >>>> principle an unbounded
> >>>> number of windows (the longer the application runs, the more windows
> >>> there
> >>>> are, with no end).
> >>>> However, we obviously can't store an infinite amount of data, so the
> >>> window
> >>>> definition includes
> >>>> a retention period. By default, this is 24 hours. After the retention
> >>>> period elapses, all of the data
> >>>> for the window is purged to make room for new windows.
> >>> Right. Would the following work for example:
> >>>
> >>> - configure retention of WindowStore to be "infinite"
> >>> - explicitly remove records from the store when windows are flushed out
> >>> - configure WindowStore log topic for compacting
> >>>
> >>> Something like the following:
> >>>
> >>> Stores
> >>> .windowStoreBuilder(
> >>> Stores.persistentWindowStore(
> >>> storeName,
> >>> Duration.of(1000L, ChronoUnit.YEARS), //
> >>> retentionPeriod
> >>> Duration.ofSeconds(10), // windowSize
> >>> false
> >>> ),
> >>> keySerde, valSerde
> >>> )
> >>> .withCachingEnabled()
> >>> .withLoggingEnabled(
> >>> Map.of(
> >>> TopicConfig.CLEANUP_POLICY_CONFIG,
> >>> TopicConfig.CLEANUP_POLICY_COMPACT
> >>> )
> >>> );
> >>>
> >>> Would in above scenario:
> >>>
> >>> - the on-disk WindowStore be kept bounded (there could be some very old
> >>> entries in it but majority will be new - depending on the activity of
> >>> particular input keys)
> >>> - the log topic be kept bounded (explicitly removed entries would be
> >>> removed from compacted log too)
> >>>
> >>> I'm moving away from DSL partly because I have some problems with
> >>> suppression (which I hope we'll be able to fix) and partly because the
> >>> DSL can't give me the complicated semantics that I need for the
> >>> application at hand. I tried to capture what I need in a custom
> >>> Transformer here:
> >>>
> >>> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
> >>>
> >>> Your knowledge of how WindowStore works would greatly help me decide if
> >>> this is a workable idea.
> >>>
> >>>> So what I meant was that if you buffer some key "A" in window (Monday
> >>>> 09:00:00) and then get
> >>>> no further activity for A for over 24 hours, then when you do get that
> >>> next
> >>>> event for A, say at
> >>>> (Tuesday 11:00:00), you'd do the scan but find nothing, since your
> >>> buffered
> >>>> state would already
> >>>> have been purged from the store.
> >>> Right. That would be the case when WindowStore was configured with
> >>> default retention of 24 hours. A quick question: What does window size
> >>> configuration for WindowStore (see above) do? Does it have to be
> >>> synchronized with the size of windows stored in it?
> >>>
> >>>> The way I avoided this problem for Suppression was to organize the
> data
> >>> by
> >>>> timestamp instead
> >>>> of by key, so on *every* update I can search for all the keys that are
> >>> old
> >>>> enough and emit them.
> >>>> I also don't use a window store, so I don't have to worry about the
> >>>> retention time.
> >>>>
> >>>> To answer your question about the window store's topic, it configures
> a
> >>>> retention time the same
> >>>> length as the store's retention time, (and they keys are the full
> >>> windowed
> >>>> key including the window
> >>>> start time), so it'll have roughly the same size bound as the store
> >>> itself.
> >>>
> >>> Would explicitly removed entries from WindowStore be removed from log
> >>> too if it was a compacting log?
> >>>
> >>>> Back to the process of figuring out what might be wrong with
> >>> Suppression, I
> >>>> don't suppose you
> >>>> would be able to file a Jira and upload a repro program? If not,
> that's
> >>> ok.
> >>>> I haven't been able to
> >>>> reproduce the bug yet, but it seems like it's happening somewhat
> >>>> consistently for you, so I should
> >>>> be able to get it to happen eventually.
> >>>>
> >>>> Thanks, and sorry again for the troubles.
> >>>> -John
> >>> I can prepare a minimal reproducer. No problem...
> >>>
> >>> Regards, Peter
> >>>
> >>>> On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com>
> >>> wrote:
> >>>>> On 1/8/19 12:57 PM, Peter Levart wrote:
> >>>>>> Hi John,
> >>>>>>
> >>>>>> On 1/8/19 12:45 PM, Peter Levart wrote:
> >>>>>>>> I looked at your custom transfomer, and it looks almost correct to
> >>>>>>>> me. The
> >>>>>>>> only flaw seems to be that it only looks
> >>>>>>>> for closed windows for the key currently being processed, which
> >>>>>>>> means that
> >>>>>>>> if you have key "A" buffered, but don't get another event for it
> >>> for a
> >>>>>>>> while after the window closes, you won't emit the final result.
> This
> >>>>>>>> might
> >>>>>>>> actually take longer than the window retention period, in which
> >>>>>>>> case, the
> >>>>>>>> data would be deleted without ever emitting the final result.
> >>>>>>> So in DSL case, the suppression works by flushing *all* of the
> "ripe"
> >>>>>>> windows in the whole buffer whenever a singe event comes in with
> >>>>>>> recent enough timestamp regardless of the key of that event?
> >>>>>>>
> >>>>>>> Is the buffer shared among processing tasks or does each task
> >>>>>>> maintain its own private buffer that only contains its share of
> data
> >>>>>>> pertaining to assigned input partitions? In case the tasks are
> >>>>>>> executed on several processing JVM(s) the buffer can't really be
> >>>>>>> shared, right? In that case a single event can't flush all of the
> >>>>>>> "ripe" windows, but just those that are contained in the task's
> part
> >>>>>>> of buffer...
> >>>>>> Just a question about your comment above:
> >>>>>>
> >>>>>> /"This might actually take longer than the window retention period,
> in
> >>>>>> which case, the data would be deleted without ever emitting the
> final
> >>>>>> result"/
> >>>>>>
> >>>>>> Are you talking about the buffer log topic retention? Aren't log
> >>>>>> topics configured to "compact" rather than "delete" messages? So the
> >>>>>> last "version" of the buffer entry for a particular key should stay
> >>>>>> forever? What are the keys in suppression buffer log topic? Are
> they a
> >>>>>> pair of (timestamp, key) ? Probably not since in that case the
> >>>>>> compacted log would grow indefinitely...
> >>>>>>
> >>>>>> Another question:
> >>>>>>
> >>>>>> What are the keys in WindowStore's log topic? If the input keys to
> the
> >>>>>> processor that uses such WindowStore consist of a bounded set of
> >>>>>> values (for example user ids), would compacted log of such
> WindowStore
> >>>>>> also be bounded?
> >>>>> In case the key of WindowStore log topic is (timestamp, key) then
> would
> >>>>> explicitly deleting flushed entries from WindowStore (by putting null
> >>>>> value into the store) keep the compacted log bounded? In other words,
> >>>>> does WindowStore log topic support a special kind of "tombstone"
> >>> message
> >>>>> that effectively removes the key from the compacted log?
> >>>>>
> >>>>> In that case, my custom processor could keep entries in its
> WindowStore
> >>>>> for as log as needed, depending on the activity of a particular input
> >>>>> key...
> >>>>>
> >>>>>> Regards, Peter
> >>>>>>
> >>>>>>
> >>>
>
>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by Peter Levart <pe...@gmail.com>.
Hi John,
Sorry I haven't had time to prepare the minimal reproducer yet. I still
have plans to do it though...
On 1/22/19 8:02 PM, John Roesler wrote:
> Hi Peter,
>
> Just to follow up on the actual bug, can you confirm whether:
> * when you say "restart", do you mean orderly shutdown and restart, or
> crash and restart?
I start it as SpringBoot application from IDEA and then stop it with the
red square button. It does initiate the shutdown sequence before
exiting... So I think it is by SIGTERM which initiates JVM shutdown hook(s).
> * have you tried this with EOS enabled? I can imagine some ways that there
> could be duplicates, but they should be impossible with EOS enabled.
Yes, I have EOS enabled.
>
> Thanks for your help,
> -John
Regards, Peter
>
> On Mon, Jan 14, 2019 at 1:20 PM John Roesler <jo...@confluent.io> wrote:
>
>> Hi Peter,
>>
>> I see your train of thought, but the actual implementation of the
>> window store is structured differently from your mental model.
>> Unlike Key/Value stores, we know that the records in a window
>> store will "expire" on a regular schedule, and also that every single
>> record will eventually expire. With this in mind, we have implemented
>> an optimization to avoid a lot of compaction overhead in RocksDB, as
>> well as saving on range scans.
>>
>> Instead of storing everything in one database, we open several
>> databases and bucket windows into them. Then, when windows
>> expire, we just ignore the records (i.e., the API makes them unreachable,
>> but we don't actually delete them). Once all the windows in a database
>> are expired, we just close and delete the whole database. Then, we open
>> a new one for new windows. If you look in the code, these databases are
>> called "segments".
>>
>> Thus, I don't think that you should attempt to use the built-in window
>> stores
>> as you described. Instead, it should be straightforward to implement your
>> own StateStore with a layout that's more favorable to your desired
>> behavior.
>>
>> You should also be able to set up the change log the way you need as well.
>> Explicitly removed entities also would get removed from the log as well, if
>> it's a compacted log.
>>
>> Actually, what you're describing is *very* similar to the implementation
>> for suppress. I might actually suggest that you just copy the suppression
>> implementation and adapt it to your needs, or at the very least, study
>> how it works. In doing so, you might actually discover the cause of the
>> bug yourself!
>>
>> I hope this helps, and thanks for your help,
>> -John
>>
>>
>> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <pe...@gmail.com>
>> wrote:
>>
>>> Hi Jonh,
>>>
>>> Thank you very much for explaining how WindowStore works. I have some
>>> more questions...
>>>
>>> On 1/10/19 5:33 PM, John Roesler wrote:
>>>> Hi Peter,
>>>>
>>>> Regarding retention, I was not referring to log retention, but to the
>>>> window store retention.
>>>> Since a new window is created every second (for example), there are in
>>>> principle an unbounded
>>>> number of windows (the longer the application runs, the more windows
>>> there
>>>> are, with no end).
>>>> However, we obviously can't store an infinite amount of data, so the
>>> window
>>>> definition includes
>>>> a retention period. By default, this is 24 hours. After the retention
>>>> period elapses, all of the data
>>>> for the window is purged to make room for new windows.
>>> Right. Would the following work for example:
>>>
>>> - configure retention of WindowStore to be "infinite"
>>> - explicitly remove records from the store when windows are flushed out
>>> - configure WindowStore log topic for compacting
>>>
>>> Something like the following:
>>>
>>> Stores
>>> .windowStoreBuilder(
>>> Stores.persistentWindowStore(
>>> storeName,
>>> Duration.of(1000L, ChronoUnit.YEARS), //
>>> retentionPeriod
>>> Duration.ofSeconds(10), // windowSize
>>> false
>>> ),
>>> keySerde, valSerde
>>> )
>>> .withCachingEnabled()
>>> .withLoggingEnabled(
>>> Map.of(
>>> TopicConfig.CLEANUP_POLICY_CONFIG,
>>> TopicConfig.CLEANUP_POLICY_COMPACT
>>> )
>>> );
>>>
>>> Would in above scenario:
>>>
>>> - the on-disk WindowStore be kept bounded (there could be some very old
>>> entries in it but majority will be new - depending on the activity of
>>> particular input keys)
>>> - the log topic be kept bounded (explicitly removed entries would be
>>> removed from compacted log too)
>>>
>>> I'm moving away from DSL partly because I have some problems with
>>> suppression (which I hope we'll be able to fix) and partly because the
>>> DSL can't give me the complicated semantics that I need for the
>>> application at hand. I tried to capture what I need in a custom
>>> Transformer here:
>>>
>>> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
>>>
>>> Your knowledge of how WindowStore works would greatly help me decide if
>>> this is a workable idea.
>>>
>>>> So what I meant was that if you buffer some key "A" in window (Monday
>>>> 09:00:00) and then get
>>>> no further activity for A for over 24 hours, then when you do get that
>>> next
>>>> event for A, say at
>>>> (Tuesday 11:00:00), you'd do the scan but find nothing, since your
>>> buffered
>>>> state would already
>>>> have been purged from the store.
>>> Right. That would be the case when WindowStore was configured with
>>> default retention of 24 hours. A quick question: What does window size
>>> configuration for WindowStore (see above) do? Does it have to be
>>> synchronized with the size of windows stored in it?
>>>
>>>> The way I avoided this problem for Suppression was to organize the data
>>> by
>>>> timestamp instead
>>>> of by key, so on *every* update I can search for all the keys that are
>>> old
>>>> enough and emit them.
>>>> I also don't use a window store, so I don't have to worry about the
>>>> retention time.
>>>>
>>>> To answer your question about the window store's topic, it configures a
>>>> retention time the same
>>>> length as the store's retention time, (and they keys are the full
>>> windowed
>>>> key including the window
>>>> start time), so it'll have roughly the same size bound as the store
>>> itself.
>>>
>>> Would explicitly removed entries from WindowStore be removed from log
>>> too if it was a compacting log?
>>>
>>>> Back to the process of figuring out what might be wrong with
>>> Suppression, I
>>>> don't suppose you
>>>> would be able to file a Jira and upload a repro program? If not, that's
>>> ok.
>>>> I haven't been able to
>>>> reproduce the bug yet, but it seems like it's happening somewhat
>>>> consistently for you, so I should
>>>> be able to get it to happen eventually.
>>>>
>>>> Thanks, and sorry again for the troubles.
>>>> -John
>>> I can prepare a minimal reproducer. No problem...
>>>
>>> Regards, Peter
>>>
>>>> On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com>
>>> wrote:
>>>>> On 1/8/19 12:57 PM, Peter Levart wrote:
>>>>>> Hi John,
>>>>>>
>>>>>> On 1/8/19 12:45 PM, Peter Levart wrote:
>>>>>>>> I looked at your custom transfomer, and it looks almost correct to
>>>>>>>> me. The
>>>>>>>> only flaw seems to be that it only looks
>>>>>>>> for closed windows for the key currently being processed, which
>>>>>>>> means that
>>>>>>>> if you have key "A" buffered, but don't get another event for it
>>> for a
>>>>>>>> while after the window closes, you won't emit the final result. This
>>>>>>>> might
>>>>>>>> actually take longer than the window retention period, in which
>>>>>>>> case, the
>>>>>>>> data would be deleted without ever emitting the final result.
>>>>>>> So in DSL case, the suppression works by flushing *all* of the "ripe"
>>>>>>> windows in the whole buffer whenever a singe event comes in with
>>>>>>> recent enough timestamp regardless of the key of that event?
>>>>>>>
>>>>>>> Is the buffer shared among processing tasks or does each task
>>>>>>> maintain its own private buffer that only contains its share of data
>>>>>>> pertaining to assigned input partitions? In case the tasks are
>>>>>>> executed on several processing JVM(s) the buffer can't really be
>>>>>>> shared, right? In that case a single event can't flush all of the
>>>>>>> "ripe" windows, but just those that are contained in the task's part
>>>>>>> of buffer...
>>>>>> Just a question about your comment above:
>>>>>>
>>>>>> /"This might actually take longer than the window retention period, in
>>>>>> which case, the data would be deleted without ever emitting the final
>>>>>> result"/
>>>>>>
>>>>>> Are you talking about the buffer log topic retention? Aren't log
>>>>>> topics configured to "compact" rather than "delete" messages? So the
>>>>>> last "version" of the buffer entry for a particular key should stay
>>>>>> forever? What are the keys in suppression buffer log topic? Are they a
>>>>>> pair of (timestamp, key) ? Probably not since in that case the
>>>>>> compacted log would grow indefinitely...
>>>>>>
>>>>>> Another question:
>>>>>>
>>>>>> What are the keys in WindowStore's log topic? If the input keys to the
>>>>>> processor that uses such WindowStore consist of a bounded set of
>>>>>> values (for example user ids), would compacted log of such WindowStore
>>>>>> also be bounded?
>>>>> In case the key of WindowStore log topic is (timestamp, key) then would
>>>>> explicitly deleting flushed entries from WindowStore (by putting null
>>>>> value into the store) keep the compacted log bounded? In other words,
>>>>> does WindowStore log topic support a special kind of "tombstone"
>>> message
>>>>> that effectively removes the key from the compacted log?
>>>>>
>>>>> In that case, my custom processor could keep entries in its WindowStore
>>>>> for as log as needed, depending on the activity of a particular input
>>>>> key...
>>>>>
>>>>>> Regards, Peter
>>>>>>
>>>>>>
>>>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by John Roesler <jo...@confluent.io>.
Hi Peter,
Just to follow up on the actual bug, can you confirm whether:
* when you say "restart", do you mean orderly shutdown and restart, or
crash and restart?
* have you tried this with EOS enabled? I can imagine some ways that there
could be duplicates, but they should be impossible with EOS enabled.
Thanks for your help,
-John
On Mon, Jan 14, 2019 at 1:20 PM John Roesler <jo...@confluent.io> wrote:
> Hi Peter,
>
> I see your train of thought, but the actual implementation of the
> window store is structured differently from your mental model.
> Unlike Key/Value stores, we know that the records in a window
> store will "expire" on a regular schedule, and also that every single
> record will eventually expire. With this in mind, we have implemented
> an optimization to avoid a lot of compaction overhead in RocksDB, as
> well as saving on range scans.
>
> Instead of storing everything in one database, we open several
> databases and bucket windows into them. Then, when windows
> expire, we just ignore the records (i.e., the API makes them unreachable,
> but we don't actually delete them). Once all the windows in a database
> are expired, we just close and delete the whole database. Then, we open
> a new one for new windows. If you look in the code, these databases are
> called "segments".
>
> Thus, I don't think that you should attempt to use the built-in window
> stores
> as you described. Instead, it should be straightforward to implement your
> own StateStore with a layout that's more favorable to your desired
> behavior.
>
> You should also be able to set up the change log the way you need as well.
> Explicitly removed entities also would get removed from the log as well, if
> it's a compacted log.
>
> Actually, what you're describing is *very* similar to the implementation
> for suppress. I might actually suggest that you just copy the suppression
> implementation and adapt it to your needs, or at the very least, study
> how it works. In doing so, you might actually discover the cause of the
> bug yourself!
>
> I hope this helps, and thanks for your help,
> -John
>
>
> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <pe...@gmail.com>
> wrote:
>
>> Hi Jonh,
>>
>> Thank you very much for explaining how WindowStore works. I have some
>> more questions...
>>
>> On 1/10/19 5:33 PM, John Roesler wrote:
>> > Hi Peter,
>> >
>> > Regarding retention, I was not referring to log retention, but to the
>> > window store retention.
>> > Since a new window is created every second (for example), there are in
>> > principle an unbounded
>> > number of windows (the longer the application runs, the more windows
>> there
>> > are, with no end).
>> > However, we obviously can't store an infinite amount of data, so the
>> window
>> > definition includes
>> > a retention period. By default, this is 24 hours. After the retention
>> > period elapses, all of the data
>> > for the window is purged to make room for new windows.
>>
>> Right. Would the following work for example:
>>
>> - configure retention of WindowStore to be "infinite"
>> - explicitly remove records from the store when windows are flushed out
>> - configure WindowStore log topic for compacting
>>
>> Something like the following:
>>
>> Stores
>> .windowStoreBuilder(
>> Stores.persistentWindowStore(
>> storeName,
>> Duration.of(1000L, ChronoUnit.YEARS), //
>> retentionPeriod
>> Duration.ofSeconds(10), // windowSize
>> false
>> ),
>> keySerde, valSerde
>> )
>> .withCachingEnabled()
>> .withLoggingEnabled(
>> Map.of(
>> TopicConfig.CLEANUP_POLICY_CONFIG,
>> TopicConfig.CLEANUP_POLICY_COMPACT
>> )
>> );
>>
>> Would in above scenario:
>>
>> - the on-disk WindowStore be kept bounded (there could be some very old
>> entries in it but majority will be new - depending on the activity of
>> particular input keys)
>> - the log topic be kept bounded (explicitly removed entries would be
>> removed from compacted log too)
>>
>> I'm moving away from DSL partly because I have some problems with
>> suppression (which I hope we'll be able to fix) and partly because the
>> DSL can't give me the complicated semantics that I need for the
>> application at hand. I tried to capture what I need in a custom
>> Transformer here:
>>
>> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
>>
>> Your knowledge of how WindowStore works would greatly help me decide if
>> this is a workable idea.
>>
>> >
>> > So what I meant was that if you buffer some key "A" in window (Monday
>> > 09:00:00) and then get
>> > no further activity for A for over 24 hours, then when you do get that
>> next
>> > event for A, say at
>> > (Tuesday 11:00:00), you'd do the scan but find nothing, since your
>> buffered
>> > state would already
>> > have been purged from the store.
>>
>> Right. That would be the case when WindowStore was configured with
>> default retention of 24 hours. A quick question: What does window size
>> configuration for WindowStore (see above) do? Does it have to be
>> synchronized with the size of windows stored in it?
>>
>> >
>> > The way I avoided this problem for Suppression was to organize the data
>> by
>> > timestamp instead
>> > of by key, so on *every* update I can search for all the keys that are
>> old
>> > enough and emit them.
>> > I also don't use a window store, so I don't have to worry about the
>> > retention time.
>> >
>> > To answer your question about the window store's topic, it configures a
>> > retention time the same
>> > length as the store's retention time, (and they keys are the full
>> windowed
>> > key including the window
>> > start time), so it'll have roughly the same size bound as the store
>> itself.
>>
>> Would explicitly removed entries from WindowStore be removed from log
>> too if it was a compacting log?
>>
>> >
>> > Back to the process of figuring out what might be wrong with
>> Suppression, I
>> > don't suppose you
>> > would be able to file a Jira and upload a repro program? If not, that's
>> ok.
>> > I haven't been able to
>> > reproduce the bug yet, but it seems like it's happening somewhat
>> > consistently for you, so I should
>> > be able to get it to happen eventually.
>> >
>> > Thanks, and sorry again for the troubles.
>> > -John
>>
>> I can prepare a minimal reproducer. No problem...
>>
>> Regards, Peter
>>
>> >
>> > On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com>
>> wrote:
>> >
>> >>
>> >> On 1/8/19 12:57 PM, Peter Levart wrote:
>> >>> Hi John,
>> >>>
>> >>> On 1/8/19 12:45 PM, Peter Levart wrote:
>> >>>>> I looked at your custom transfomer, and it looks almost correct to
>> >>>>> me. The
>> >>>>> only flaw seems to be that it only looks
>> >>>>> for closed windows for the key currently being processed, which
>> >>>>> means that
>> >>>>> if you have key "A" buffered, but don't get another event for it
>> for a
>> >>>>> while after the window closes, you won't emit the final result. This
>> >>>>> might
>> >>>>> actually take longer than the window retention period, in which
>> >>>>> case, the
>> >>>>> data would be deleted without ever emitting the final result.
>> >>>> So in DSL case, the suppression works by flushing *all* of the "ripe"
>> >>>> windows in the whole buffer whenever a singe event comes in with
>> >>>> recent enough timestamp regardless of the key of that event?
>> >>>>
>> >>>> Is the buffer shared among processing tasks or does each task
>> >>>> maintain its own private buffer that only contains its share of data
>> >>>> pertaining to assigned input partitions? In case the tasks are
>> >>>> executed on several processing JVM(s) the buffer can't really be
>> >>>> shared, right? In that case a single event can't flush all of the
>> >>>> "ripe" windows, but just those that are contained in the task's part
>> >>>> of buffer...
>> >>> Just a question about your comment above:
>> >>>
>> >>> /"This might actually take longer than the window retention period, in
>> >>> which case, the data would be deleted without ever emitting the final
>> >>> result"/
>> >>>
>> >>> Are you talking about the buffer log topic retention? Aren't log
>> >>> topics configured to "compact" rather than "delete" messages? So the
>> >>> last "version" of the buffer entry for a particular key should stay
>> >>> forever? What are the keys in suppression buffer log topic? Are they a
>> >>> pair of (timestamp, key) ? Probably not since in that case the
>> >>> compacted log would grow indefinitely...
>> >>>
>> >>> Another question:
>> >>>
>> >>> What are the keys in WindowStore's log topic? If the input keys to the
>> >>> processor that uses such WindowStore consist of a bounded set of
>> >>> values (for example user ids), would compacted log of such WindowStore
>> >>> also be bounded?
>> >> In case the key of WindowStore log topic is (timestamp, key) then would
>> >> explicitly deleting flushed entries from WindowStore (by putting null
>> >> value into the store) keep the compacted log bounded? In other words,
>> >> does WindowStore log topic support a special kind of "tombstone"
>> message
>> >> that effectively removes the key from the compacted log?
>> >>
>> >> In that case, my custom processor could keep entries in its WindowStore
>> >> for as log as needed, depending on the activity of a particular input
>> >> key...
>> >>
>> >>> Regards, Peter
>> >>>
>> >>>
>> >>
>>
>>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by John Roesler <jo...@confluent.io>.
Hi Peter,
I see your train of thought, but the actual implementation of the
window store is structured differently from your mental model.
Unlike Key/Value stores, we know that the records in a window
store will "expire" on a regular schedule, and also that every single
record will eventually expire. With this in mind, we have implemented
an optimization to avoid a lot of compaction overhead in RocksDB, as
well as saving on range scans.
Instead of storing everything in one database, we open several
databases and bucket windows into them. Then, when windows
expire, we just ignore the records (i.e., the API makes them unreachable,
but we don't actually delete them). Once all the windows in a database
are expired, we just close and delete the whole database. Then, we open
a new one for new windows. If you look in the code, these databases are
called "segments".
Thus, I don't think that you should attempt to use the built-in window
stores
as you described. Instead, it should be straightforward to implement your
own StateStore with a layout that's more favorable to your desired behavior.
You should also be able to set up the change log the way you need as well.
Explicitly removed entities also would get removed from the log as well, if
it's a compacted log.
Actually, what you're describing is *very* similar to the implementation
for suppress. I might actually suggest that you just copy the suppression
implementation and adapt it to your needs, or at the very least, study
how it works. In doing so, you might actually discover the cause of the
bug yourself!
I hope this helps, and thanks for your help,
-John
On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <pe...@gmail.com> wrote:
> Hi Jonh,
>
> Thank you very much for explaining how WindowStore works. I have some
> more questions...
>
> On 1/10/19 5:33 PM, John Roesler wrote:
> > Hi Peter,
> >
> > Regarding retention, I was not referring to log retention, but to the
> > window store retention.
> > Since a new window is created every second (for example), there are in
> > principle an unbounded
> > number of windows (the longer the application runs, the more windows
> there
> > are, with no end).
> > However, we obviously can't store an infinite amount of data, so the
> window
> > definition includes
> > a retention period. By default, this is 24 hours. After the retention
> > period elapses, all of the data
> > for the window is purged to make room for new windows.
>
> Right. Would the following work for example:
>
> - configure retention of WindowStore to be "infinite"
> - explicitly remove records from the store when windows are flushed out
> - configure WindowStore log topic for compacting
>
> Something like the following:
>
> Stores
> .windowStoreBuilder(
> Stores.persistentWindowStore(
> storeName,
> Duration.of(1000L, ChronoUnit.YEARS), //
> retentionPeriod
> Duration.ofSeconds(10), // windowSize
> false
> ),
> keySerde, valSerde
> )
> .withCachingEnabled()
> .withLoggingEnabled(
> Map.of(
> TopicConfig.CLEANUP_POLICY_CONFIG,
> TopicConfig.CLEANUP_POLICY_COMPACT
> )
> );
>
> Would in above scenario:
>
> - the on-disk WindowStore be kept bounded (there could be some very old
> entries in it but majority will be new - depending on the activity of
> particular input keys)
> - the log topic be kept bounded (explicitly removed entries would be
> removed from compacted log too)
>
> I'm moving away from DSL partly because I have some problems with
> suppression (which I hope we'll be able to fix) and partly because the
> DSL can't give me the complicated semantics that I need for the
> application at hand. I tried to capture what I need in a custom
> Transformer here:
>
> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
>
> Your knowledge of how WindowStore works would greatly help me decide if
> this is a workable idea.
>
> >
> > So what I meant was that if you buffer some key "A" in window (Monday
> > 09:00:00) and then get
> > no further activity for A for over 24 hours, then when you do get that
> next
> > event for A, say at
> > (Tuesday 11:00:00), you'd do the scan but find nothing, since your
> buffered
> > state would already
> > have been purged from the store.
>
> Right. That would be the case when WindowStore was configured with
> default retention of 24 hours. A quick question: What does window size
> configuration for WindowStore (see above) do? Does it have to be
> synchronized with the size of windows stored in it?
>
> >
> > The way I avoided this problem for Suppression was to organize the data
> by
> > timestamp instead
> > of by key, so on *every* update I can search for all the keys that are
> old
> > enough and emit them.
> > I also don't use a window store, so I don't have to worry about the
> > retention time.
> >
> > To answer your question about the window store's topic, it configures a
> > retention time the same
> > length as the store's retention time, (and they keys are the full
> windowed
> > key including the window
> > start time), so it'll have roughly the same size bound as the store
> itself.
>
> Would explicitly removed entries from WindowStore be removed from log
> too if it was a compacting log?
>
> >
> > Back to the process of figuring out what might be wrong with
> Suppression, I
> > don't suppose you
> > would be able to file a Jira and upload a repro program? If not, that's
> ok.
> > I haven't been able to
> > reproduce the bug yet, but it seems like it's happening somewhat
> > consistently for you, so I should
> > be able to get it to happen eventually.
> >
> > Thanks, and sorry again for the troubles.
> > -John
>
> I can prepare a minimal reproducer. No problem...
>
> Regards, Peter
>
> >
> > On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com>
> wrote:
> >
> >>
> >> On 1/8/19 12:57 PM, Peter Levart wrote:
> >>> Hi John,
> >>>
> >>> On 1/8/19 12:45 PM, Peter Levart wrote:
> >>>>> I looked at your custom transfomer, and it looks almost correct to
> >>>>> me. The
> >>>>> only flaw seems to be that it only looks
> >>>>> for closed windows for the key currently being processed, which
> >>>>> means that
> >>>>> if you have key "A" buffered, but don't get another event for it for
> a
> >>>>> while after the window closes, you won't emit the final result. This
> >>>>> might
> >>>>> actually take longer than the window retention period, in which
> >>>>> case, the
> >>>>> data would be deleted without ever emitting the final result.
> >>>> So in DSL case, the suppression works by flushing *all* of the "ripe"
> >>>> windows in the whole buffer whenever a singe event comes in with
> >>>> recent enough timestamp regardless of the key of that event?
> >>>>
> >>>> Is the buffer shared among processing tasks or does each task
> >>>> maintain its own private buffer that only contains its share of data
> >>>> pertaining to assigned input partitions? In case the tasks are
> >>>> executed on several processing JVM(s) the buffer can't really be
> >>>> shared, right? In that case a single event can't flush all of the
> >>>> "ripe" windows, but just those that are contained in the task's part
> >>>> of buffer...
> >>> Just a question about your comment above:
> >>>
> >>> /"This might actually take longer than the window retention period, in
> >>> which case, the data would be deleted without ever emitting the final
> >>> result"/
> >>>
> >>> Are you talking about the buffer log topic retention? Aren't log
> >>> topics configured to "compact" rather than "delete" messages? So the
> >>> last "version" of the buffer entry for a particular key should stay
> >>> forever? What are the keys in suppression buffer log topic? Are they a
> >>> pair of (timestamp, key) ? Probably not since in that case the
> >>> compacted log would grow indefinitely...
> >>>
> >>> Another question:
> >>>
> >>> What are the keys in WindowStore's log topic? If the input keys to the
> >>> processor that uses such WindowStore consist of a bounded set of
> >>> values (for example user ids), would compacted log of such WindowStore
> >>> also be bounded?
> >> In case the key of WindowStore log topic is (timestamp, key) then would
> >> explicitly deleting flushed entries from WindowStore (by putting null
> >> value into the store) keep the compacted log bounded? In other words,
> >> does WindowStore log topic support a special kind of "tombstone" message
> >> that effectively removes the key from the compacted log?
> >>
> >> In that case, my custom processor could keep entries in its WindowStore
> >> for as log as needed, depending on the activity of a particular input
> >> key...
> >>
> >>> Regards, Peter
> >>>
> >>>
> >>
>
>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by John Roesler <jo...@confluent.io>.
Hi Peter,
Regarding retention, I was not referring to log retention, but to the
window store retention.
Since a new window is created every second (for example), there are in
principle an unbounded
number of windows (the longer the application runs, the more windows there
are, with no end).
However, we obviously can't store an infinite amount of data, so the window
definition includes
a retention period. By default, this is 24 hours. After the retention
period elapses, all of the data
for the window is purged to make room for new windows.
So what I meant was that if you buffer some key "A" in window (Monday
09:00:00) and then get
no further activity for A for over 24 hours, then when you do get that next
event for A, say at
(Tuesday 11:00:00), you'd do the scan but find nothing, since your buffered
state would already
have been purged from the store.
The way I avoided this problem for Suppression was to organize the data by
timestamp instead
of by key, so on *every* update I can search for all the keys that are old
enough and emit them.
I also don't use a window store, so I don't have to worry about the
retention time.
To answer your question about the window store's topic, it configures a
retention time the same
length as the store's retention time, (and they keys are the full windowed
key including the window
start time), so it'll have roughly the same size bound as the store itself.
Back to the process of figuring out what might be wrong with Suppression, I
don't suppose you
would be able to file a Jira and upload a repro program? If not, that's ok.
I haven't been able to
reproduce the bug yet, but it seems like it's happening somewhat
consistently for you, so I should
be able to get it to happen eventually.
Thanks, and sorry again for the troubles.
-John
On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com> wrote:
>
>
> On 1/8/19 12:57 PM, Peter Levart wrote:
> > Hi John,
> >
> > On 1/8/19 12:45 PM, Peter Levart wrote:
> >>> I looked at your custom transfomer, and it looks almost correct to
> >>> me. The
> >>> only flaw seems to be that it only looks
> >>> for closed windows for the key currently being processed, which
> >>> means that
> >>> if you have key "A" buffered, but don't get another event for it for a
> >>> while after the window closes, you won't emit the final result. This
> >>> might
> >>> actually take longer than the window retention period, in which
> >>> case, the
> >>> data would be deleted without ever emitting the final result.
> >>
> >> So in DSL case, the suppression works by flushing *all* of the "ripe"
> >> windows in the whole buffer whenever a singe event comes in with
> >> recent enough timestamp regardless of the key of that event?
> >>
> >> Is the buffer shared among processing tasks or does each task
> >> maintain its own private buffer that only contains its share of data
> >> pertaining to assigned input partitions? In case the tasks are
> >> executed on several processing JVM(s) the buffer can't really be
> >> shared, right? In that case a single event can't flush all of the
> >> "ripe" windows, but just those that are contained in the task's part
> >> of buffer...
> >
> > Just a question about your comment above:
> >
> > /"This might actually take longer than the window retention period, in
> > which case, the data would be deleted without ever emitting the final
> > result"/
> >
> > Are you talking about the buffer log topic retention? Aren't log
> > topics configured to "compact" rather than "delete" messages? So the
> > last "version" of the buffer entry for a particular key should stay
> > forever? What are the keys in suppression buffer log topic? Are they a
> > pair of (timestamp, key) ? Probably not since in that case the
> > compacted log would grow indefinitely...
> >
> > Another question:
> >
> > What are the keys in WindowStore's log topic? If the input keys to the
> > processor that uses such WindowStore consist of a bounded set of
> > values (for example user ids), would compacted log of such WindowStore
> > also be bounded?
>
> In case the key of WindowStore log topic is (timestamp, key) then would
> explicitly deleting flushed entries from WindowStore (by putting null
> value into the store) keep the compacted log bounded? In other words,
> does WindowStore log topic support a special kind of "tombstone" message
> that effectively removes the key from the compacted log?
>
> In that case, my custom processor could keep entries in its WindowStore
> for as log as needed, depending on the activity of a particular input
> key...
>
> >
> > Regards, Peter
> >
> >
>
>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by Peter Levart <pe...@gmail.com>.
On 1/8/19 12:57 PM, Peter Levart wrote:
> Hi John,
>
> On 1/8/19 12:45 PM, Peter Levart wrote:
>>> I looked at your custom transfomer, and it looks almost correct to
>>> me. The
>>> only flaw seems to be that it only looks
>>> for closed windows for the key currently being processed, which
>>> means that
>>> if you have key "A" buffered, but don't get another event for it for a
>>> while after the window closes, you won't emit the final result. This
>>> might
>>> actually take longer than the window retention period, in which
>>> case, the
>>> data would be deleted without ever emitting the final result.
>>
>> So in DSL case, the suppression works by flushing *all* of the "ripe"
>> windows in the whole buffer whenever a singe event comes in with
>> recent enough timestamp regardless of the key of that event?
>>
>> Is the buffer shared among processing tasks or does each task
>> maintain its own private buffer that only contains its share of data
>> pertaining to assigned input partitions? In case the tasks are
>> executed on several processing JVM(s) the buffer can't really be
>> shared, right? In that case a single event can't flush all of the
>> "ripe" windows, but just those that are contained in the task's part
>> of buffer...
>
> Just a question about your comment above:
>
> /"This might actually take longer than the window retention period, in
> which case, the data would be deleted without ever emitting the final
> result"/
>
> Are you talking about the buffer log topic retention? Aren't log
> topics configured to "compact" rather than "delete" messages? So the
> last "version" of the buffer entry for a particular key should stay
> forever? What are the keys in suppression buffer log topic? Are they a
> pair of (timestamp, key) ? Probably not since in that case the
> compacted log would grow indefinitely...
>
> Another question:
>
> What are the keys in WindowStore's log topic? If the input keys to the
> processor that uses such WindowStore consist of a bounded set of
> values (for example user ids), would compacted log of such WindowStore
> also be bounded?
In case the key of WindowStore log topic is (timestamp, key) then would
explicitly deleting flushed entries from WindowStore (by putting null
value into the store) keep the compacted log bounded? In other words,
does WindowStore log topic support a special kind of "tombstone" message
that effectively removes the key from the compacted log?
In that case, my custom processor could keep entries in its WindowStore
for as log as needed, depending on the activity of a particular input key...
>
> Regards, Peter
>
>
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by Peter Levart <pe...@gmail.com>.
Hi John,
On 1/8/19 12:45 PM, Peter Levart wrote:
>> I looked at your custom transfomer, and it looks almost correct to
>> me. The
>> only flaw seems to be that it only looks
>> for closed windows for the key currently being processed, which means
>> that
>> if you have key "A" buffered, but don't get another event for it for a
>> while after the window closes, you won't emit the final result. This
>> might
>> actually take longer than the window retention period, in which case,
>> the
>> data would be deleted without ever emitting the final result.
>
> So in DSL case, the suppression works by flushing *all* of the "ripe"
> windows in the whole buffer whenever a singe event comes in with
> recent enough timestamp regardless of the key of that event?
>
> Is the buffer shared among processing tasks or does each task maintain
> its own private buffer that only contains its share of data pertaining
> to assigned input partitions? In case the tasks are executed on
> several processing JVM(s) the buffer can't really be shared, right? In
> that case a single event can't flush all of the "ripe" windows, but
> just those that are contained in the task's part of buffer...
Just a question about your comment above:
/"This might actually take longer than the window retention period, in
which case, the data would be deleted without ever emitting the final
result"/
Are you talking about the buffer log topic retention? Aren't log topics
configured to "compact" rather than "delete" messages? So the last
"version" of the buffer entry for a particular key should stay forever?
What are the keys in suppression buffer log topic? Are they a pair of
(timestamp, key) ? Probably not since in that case the compacted log
would grow indefinitely...
Another question:
What are the keys in WindowStore's log topic? If the input keys to the
processor that uses such WindowStore consist of a bounded set of values
(for example user ids), would compacted log of such WindowStore also be
bounded?
Regards, Peter
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress
some non-final results when the kafka streams process is restarted
Posted by Peter Levart <pe...@gmail.com>.
Hi John,
On 1/7/19 9:10 PM, John Roesler wrote:
> Hi Peter,
>
> Sorry, I just now have seen this thread.
>
> You asked if this behavior is unexpected, and the answer is yes.
> Suppress.untilWindowCloses is intended to emit only the final result,
> regardless of restarts.
>
> You also asked how the suppression buffer can resume after a restart, since
> it's not persistent.
> The answer is the same as for in-memory stores. The state of the store (or
> buffer, in this case)
> is persisted to a changelog topic, which is re-read on restart to re-create
> the exact state prior to shutdown.
> "Persistent" in the store nomenclature refers only to "persistent on the
> local disk".
>
> Just to confirm your response regarding the buffer size:
> While it is better to use the public ("Suppressed.unbounded()") API, yes,
> your buffer was already unbounded.
>
> I looked at your custom transfomer, and it looks almost correct to me. The
> only flaw seems to be that it only looks
> for closed windows for the key currently being processed, which means that
> if you have key "A" buffered, but don't get another event for it for a
> while after the window closes, you won't emit the final result. This might
> actually take longer than the window retention period, in which case, the
> data would be deleted without ever emitting the final result.
So in DSL case, the suppression works by flushing *all* of the "ripe"
windows in the whole buffer whenever a singe event comes in with recent
enough timestamp regardless of the key of that event?
Is the buffer shared among processing tasks or does each task maintain
its own private buffer that only contains its share of data pertaining
to assigned input partitions? In case the tasks are executed on several
processing JVM(s) the buffer can't really be shared, right? In that case
a single event can't flush all of the "ripe" windows, but just those
that are contained in the task's part of buffer...
>
> You said you think it should be possible to get the DSL version working,
> and I agree, since this is exactly what it was designed for. Do you mind
> filing a bug in the "KAFKA" Jira project (
> https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to
> keep the investigation organized that way.
Will do that.
>
> In the mean time, I'll take another look at your logs above and try to
> reason about what could be wrong.
>
> Just one clarification... For example, you showed
>> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
> 272, 548, 172], sum: 138902
>> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
> 272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164
>
> Am I correct in thinking that the first, shorter list is the "incremental"
> version, and the second is the "final" version? I think so, but am confused
> by "INSTEAD OF".
It's the other way around. The 1st list (usually the longer one) is what
has just been consumed and the second is what had been consumed before
that for the same key (I maintain a ConcurrentHashMap of consumed
entries in the test and execute: secondList = map.put(key, firstList) ....
In majority of cases, the consumed list is an incremental update of some
previous version of the list (not necessarily direct descendant)
consumed before that, but as said, I also observed the final window
result before processor restart and after restart some previous version
of non-final window aggregation for the same key.
May I also note that there is some "jitter" in the input timestamps
because I'm trying to model a real usecase where there will be several
input(s) to the system with only approximately synchronized clocks. The
jitter is kept well below the TimeWindow grace period so there should be
no events consumed by the processor that belong to windows that have
already been flushed.
Regards, Peter
>
> Thanks for the report,
> -John
>
>
>
> On Wed, Dec 26, 2018 at 3:21 AM Peter Levart <pe...@gmail.com> wrote:
>
>>
>> On 12/21/18 3:16 PM, Peter Levart wrote:
>>> I also see some results that are actual non-final window aggregations
>>> that precede the final aggregations. These non-final results are never
>>> emitted out of order (for example, no such non-final result would ever
>>> come after the final result for a particular key/window).
>> Absence of proof is not the proof of absence... And I have later
>> observed (using the DSL variant, not the custom Transformer) an
>> occurrence of a non-final result that was emited after restart of
>> streams processor while the final result for the same key/window had
>> been emitted before the restart:
>>
>> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550,
>> 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856
>> ...
>> ... restart ...
>> ...
>> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550]
>> INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648
>>
>>
>> The app logic can not even rely on guarantee that results are ordered
>> then. This is really not usable until the bug is fixed.
>>
>> Regards, Peter
>>
>>