You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by John Roesler <jo...@confluent.io> on 2019/01/07 20:10:46 UTC

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Hi Peter,

Sorry, I just now have seen this thread.

You asked if this behavior is unexpected, and the answer is yes.
Suppress.untilWindowCloses is intended to emit only the final result,
regardless of restarts.

You also asked how the suppression buffer can resume after a restart, since
it's not persistent.
The answer is the same as for in-memory stores. The state of the store (or
buffer, in this case)
is persisted to a changelog topic, which is re-read on restart to re-create
the exact state prior to shutdown.
"Persistent" in the store nomenclature refers only to "persistent on the
local disk".

Just to confirm your response regarding the buffer size:
While it is better to use the public ("Suppressed.unbounded()") API, yes,
your buffer was already unbounded.

I looked at your custom transfomer, and it looks almost correct to me. The
only flaw seems to be that it only looks
for closed windows for the key currently being processed, which means that
if you have key "A" buffered, but don't get another event for it for a
while after the window closes, you won't emit the final result. This might
actually take longer than the window retention period, in which case, the
data would be deleted without ever emitting the final result.

You said you think it should be possible to get the DSL version working,
and I agree, since this is exactly what it was designed for. Do you mind
filing a bug in the "KAFKA" Jira project (
https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to
keep the investigation organized that way.

In the mean time, I'll take another look at your logs above and try to
reason about what could be wrong.

Just one clarification... For example, you showed
> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172], sum: 138902
> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164

Am I correct in thinking that the first, shorter list is the "incremental"
version, and the second is the "final" version? I think so, but am confused
by "INSTEAD OF".

Thanks for the report,
-John



On Wed, Dec 26, 2018 at 3:21 AM Peter Levart <pe...@gmail.com> wrote:

>
>
> On 12/21/18 3:16 PM, Peter Levart wrote:
> > I also see some results that are actual non-final window aggregations
> > that precede the final aggregations. These non-final results are never
> > emitted out of order (for example, no such non-final result would ever
> > come after the final result for a particular key/window).
>
> Absence of proof is not the proof of absence... And I have later
> observed (using the DSL variant, not the custom Transformer) an
> occurrence of a non-final result that was emited after restart of
> streams processor while the final result for the same key/window had
> been emitted before the restart:
>
> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550,
> 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856
> ...
> ... restart ...
> ...
> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550]
> INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648
>
>
> The app logic can not even rely on guarantee that results are ordered
> then. This is really not usable until the bug is fixed.
>
> Regards, Peter
>
>

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by Jonathan Santilli <jo...@gmail.com>.
Sure John, I will document it.


Thanks a lot for your reply.
--
Jonathan



On Tue, Mar 5, 2019 at 7:38 PM John Roesler <jo...@confluent.io> wrote:

> Hi Jonathan,
>
> Just a quick update: I have not been able to reproduce the duplicates issue
> with the 2.2 RC, even with a topology very similar to the one you included
> in your stackoverflow post.
>
> I think we should treat this as a new bug. Would you mind opening a new
> Jira bug ticket with some steps to reproduce the problem, and also exactly
> the behavior you observe?
>
> Thanks,
> -John
>
> On Mon, Mar 4, 2019 at 10:41 PM John Roesler <jo...@confluent.io> wrote:
>
> > Hi Jonathan,
> >
> > Sorry to hear that the feature is causing you trouble as well, and that
> > the 2.2 release candidate didn't seem to fix it.
> >
> > I'll try and do a repro based on the code in your SO post tomorrow.
> >
> > I don't think it's related to the duplicates, but that shutdown error is
> > puzzling. Can you print the topology (with topology.describe() ) ? This
> > will tell us what is in task 1 (i.e., *1_*) of your program.
> >
> > Thanks,
> > -John
> >
> > On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
> > jonathansantilli@gmail.com> wrote:
> >
> >> BTW, after stopping the app gracefully (Stream#close()), this error
> shows
> >> up repeatedly:
> >>
> >> 2019-03-01 17:18:07,819 WARN
> >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
> >> [0_0] Failed to write offset checkpoint file to
> >> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
> >>
> >> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
> >> (No such file or directory)
> >>
> >> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> >> ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> >> ~[?:1.8.0_191]
> >>
> >> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> >> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
> >>
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
> >> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
> >> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamTask.close(
> >> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
> >> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
> >> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
> >>
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
> >> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamThread.run(
> >> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
> >>
> >>
> >> However, I have checked and the folder created starts with: *1_*
> >>
> >> ls -lha /tmp/kafka-stream/XXX/1_1
> >> total 8
> >> drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
> >> drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
> >> -rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
> >> -rw-r--r--   1 a  b     0B  1 Mar 16:05 .lock
> >> drwxr-xr-x   3 a  b    96B  1 Mar 16:43
> >> KSTREAM-REDUCE-STATE-STORE-0000000005
> >>
> >>
> >>
> >> Cheers!
> >> --
> >> Jonathan
> >>
> >>
> >>
> >> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
> >> jonathansantilli@gmail.com>
> >> wrote:
> >>
> >> > Hello John, hope you are well.
> >> > I have tested the version 2.2 release candidate (although I know it
> has
> >> > been postponed).
> >> > I have been following this email thread because I think am
> experiencing
> >> > the same issue. I have reported in an email to this list and also all
> >> the
> >> > details are in OS (
> >> >
> >>
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> >> > ).
> >> >
> >> > After the test, the result is the same as before (at least for my
> case),
> >> > already processed records are passed again to the output topic causing
> >> the
> >> > data duplication:
> >> >
> >> > ...
> >> > 2019-03-01 16:55:23,808 INFO
> >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> >> > stream-thread
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> No
> >> > checkpoint found for task 1_10 state store
> >> > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> >> > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS
> turned
> >> on. *Reinitializing
> >> > the task and restore its state from the beginning.*
> >> >
> >> > ...
> >> >
> >> >
> >> > I was hoping for this to be fixed, but is not the case, at least for
> my
> >> > case.
> >> >
> >> > If you can, please take a look at the question in SO, I was in contact
> >> > with Matthias about it, he also points me the place where probably the
> >> > potential but could be present.
> >> >
> >> > Please, let me know any thoughts.
> >> >
> >> >
> >> > Cheers!
> >> > --
> >> > Jonathan
> >> >
> >> >
> >> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io>
> wrote:
> >> >
> >> >> Hi again, Peter,
> >> >>
> >> >> Just to close the loop about the bug in Suppress, we did get the
> >> >> (apparent)
> >> >> same report from a few other people:
> >> >> https://issues.apache.org/jira/browse/KAFKA-7895
> >> >>
> >> >> I also managed to reproduce the duplicate-result behavior, which
> could
> >> >> cause it to emit both intermediate results and duplicate final
> results.
> >> >>
> >> >> There's a patch for it in the 2.2 release candidate. Perhaps you can
> >> try
> >> >> it
> >> >> out and see if it resolves the issue for you?
> >> >>
> >> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed
> the
> >> >> last
> >> >> 2.1 bugfix release.
> >> >>
> >> >> Thanks,
> >> >> -John
> >> >>
> >> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io>
> >> wrote:
> >> >>
> >> >> > Hi Peter,
> >> >> >
> >> >> > Thanks for the replies.
> >> >> >
> >> >> > Regarding transactions:
> >> >> > Yes, actually, with EOS enabled, the changelog and the output
> topics
> >> are
> >> >> > all produced with the same transactional producer, within the same
> >> >> > transactions. So it should already be atomic.
> >> >> >
> >> >> > Regarding restore:
> >> >> > Streams doesn't put the store into service until the restore is
> >> >> completed,
> >> >> > so it should be guaranteed not to happen. But there's of course no
> >> >> > guarantee that I didn't mess something up. I'll take a hard look at
> >> it.
> >> >> >
> >> >> > Regarding restoration and offsets:
> >> >> > Your guess is correct: Streams tracks the latest stored offset
> >> outside
> >> >> of
> >> >> > the store implementation itself, specifically by writing a file
> >> (called
> >> >> a
> >> >> > Checkpoint File) in the state directory. If the file is there, it
> >> reads
> >> >> > that offset and restores from that point. If the file is missing,
> it
> >> >> > restores from the beginning of the stream. So it should "just work"
> >> for
> >> >> > you. Just for completeness, there have been several edge cases
> >> >> discovered
> >> >> > where this mechanism isn't completely safe, so in the case of EOS,
> I
> >> >> > believe we actually disregard that checkpoint file and the prior
> >> state
> >> >> and
> >> >> > always rebuild from the earliest offset in the changelog.
> >> >> >
> >> >> > Personally, I would like to see us provide the ability to store the
> >> >> > checkpoint inside the state store, so that checkpoint updates are
> >> >> > linearized correctly w.r.t. data updates, but I actually haven't
> >> >> mentioned
> >> >> > this thought to anyone until now ;)
> >> >> >
> >> >> > Finally, regarding your prior email:
> >> >> > Yes, I was thinking that the "wrong" output values might be part of
> >> >> > rolled-back transactions and therefore enabling read-committed mode
> >> on
> >> >> the
> >> >> > consumer might tell a different story that what you've seen to
> date.
> >> >> >
> >> >> > I'm honestly still baffled about those intermediate results that
> are
> >> >> > sneaking out. I wonder if it's something specific to your data
> >> stream,
> >> >> like
> >> >> > maybe if there is maybe an edge case when two records have exactly
> >> the
> >> >> same
> >> >> > timestamp? I'll have to stare at the code some more...
> >> >> >
> >> >> > Regardless, in order to reap the benefits of running the app with
> >> EOS,
> >> >> you
> >> >> > really have to also set your consumers to read_committed.
> Otherwise,
> >> >> you'll
> >> >> > be seeing output data from aborted (aka rolled-back) transactions,
> >> and
> >> >> you
> >> >> > miss the intended "exactly once" guarantee.
> >> >> >
> >> >> > Thanks,
> >> >> > -John
> >> >> >
> >> >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <
> peter.levart@gmail.com
> >> >
> >> >> > wrote:
> >> >> >
> >> >> >> Hi John,
> >> >> >>
> >> >> >> Haven't been able to reinstate the demo yet, but I have been
> >> re-reading
> >> >> >> the following scenario of yours....
> >> >> >>
> >> >> >> On 1/24/19 11:48 PM, Peter Levart wrote:
> >> >> >> > Hi John,
> >> >> >> >
> >> >> >> > On 1/24/19 3:18 PM, John Roesler wrote:
> >> >> >> >
> >> >> >> >>
> >> >> >> >> The reason is that, upon restart, the suppression buffer can
> only
> >> >> >> >> "remember" what got sent & committed to its changelog topic
> >> before.
> >> >> >> >>
> >> >> >> >> The scenario I have in mind is:
> >> >> >> >>
> >> >> >> >> ...
> >> >> >> >> * buffer state X
> >> >> >> >> ...
> >> >> >> >> * flush state X to buffer changelog
> >> >> >> >> ...
> >> >> >> >> * commit transaction T0; start new transaction T1
> >> >> >> >> ...
> >> >> >> >> * emit final result X (in uncommitted transaction T1)
> >> >> >> >> ...
> >> >> >> >> * crash before flushing to the changelog the fact that state X
> >> was
> >> >> >> >> emitted.
> >> >> >> >> Also, transaction T1 gets aborted, since we crash before
> >> committing.
> >> >> >> >> ...
> >> >> >> >> * restart, restoring state X again from the changelog (because
> >> the
> >> >> emit
> >> >> >> >> didn't get committed)
> >> >> >> >> * start transaction T2
> >> >> >> >> * emit final result X again (in uncommitted transaction T2)
> >> >> >> >> ...
> >> >> >> >> * commit transaction T2
> >> >> >> >> ...
> >> >> >> >>
> >> >> >> >> So, the result gets emitted twice, but the first time is in an
> >> >> aborted
> >> >> >> >> transaction. This leads me to another clarifying question:
> >> >> >> >>
> >> >> >> >> Based on your first message, it seems like the duplicates you
> >> >> observe
> >> >> >> >> are
> >> >> >> >> in the output topic. When you read the topic, do you configure
> >> your
> >> >> >> >> consumer with "read committed" mode? If not, you'll see
> "results"
> >> >> from
> >> >> >> >> uncommitted transactions, which could explain the duplicates.
> >> >> >>
> >> >> >> ...and I was thinking that perhaps the right solution to the
> >> >> suppression
> >> >> >> problem would be to use transactional producers for the resulting
> >> >> output
> >> >> >> topic AND the store change-log. Is this possible? Does the
> >> compaction
> >> >> of
> >> >> >> the log on the brokers work for transactional producers as
> >> expected? In
> >> >> >> that case, the sending of final result and the marking of that
> fact
> >> in
> >> >> >> the store change log would together be an atomic operation.
> >> >> >> That said, I think there's another problem with suppression which
> >> looks
> >> >> >> like the supression processor is already processing the input
> while
> >> the
> >> >> >> state store has not been fully restored yet or something
> related...
> >> Is
> >> >> >> this guaranteed not to happen?
> >> >> >>
> >> >> >> And now something unrelated I wanted to ask...
> >> >> >>
> >> >> >> I'm trying to create my own custom state store. From the API I can
> >> see
> >> >> >> it is pretty straightforward. One thing that I don't quite
> >> understand
> >> >> is
> >> >> >> how Kafka Streams know whether to replay the whole change log
> after
> >> the
> >> >> >> store registers itself or just a part of it and which part (from
> >> which
> >> >> >> offset per partition). There doesn't seem to be any API point
> >> through
> >> >> >> which the store could communicate this information back to Kafka
> >> >> >> Streams. Is such bookkeeping performed outside the store? Does
> Kafka
> >> >> >> Streams first invoke flush() on the store and then notes down the
> >> >> >> offsets from the change log producer somewhere? So next time the
> >> store
> >> >> >> is brought up, the log is only replayed from last noted down
> >> offset? So
> >> >> >> it can happen that the store gets some log entries that have
> already
> >> >> >> been incorporated in it (from the point of one flush before) but
> >> never
> >> >> >> misses any... In any case there has to be an indication somewhere
> >> that
> >> >> >> the store didn't survive and has to be rebuilt from scratch. How
> do
> >> >> >> Kafka Streams detect that situation? By placing some marker file
> >> into
> >> >> >> the directory reserved for store's local storage?
> >> >> >>
> >> >> >> Regards, Peter
> >> >> >>
> >> >> >>
> >> >>
> >> >
> >> >
> >> > --
> >> > Santilli Jonathan
> >> >
> >>
> >>
> >> --
> >> Santilli Jonathan
> >>
> >
>


-- 
Santilli Jonathan

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by John Roesler <jo...@confluent.io>.
Hi Jonathan,

Just a quick update: I have not been able to reproduce the duplicates issue
with the 2.2 RC, even with a topology very similar to the one you included
in your stackoverflow post.

I think we should treat this as a new bug. Would you mind opening a new
Jira bug ticket with some steps to reproduce the problem, and also exactly
the behavior you observe?

Thanks,
-John

On Mon, Mar 4, 2019 at 10:41 PM John Roesler <jo...@confluent.io> wrote:

> Hi Jonathan,
>
> Sorry to hear that the feature is causing you trouble as well, and that
> the 2.2 release candidate didn't seem to fix it.
>
> I'll try and do a repro based on the code in your SO post tomorrow.
>
> I don't think it's related to the duplicates, but that shutdown error is
> puzzling. Can you print the topology (with topology.describe() ) ? This
> will tell us what is in task 1 (i.e., *1_*) of your program.
>
> Thanks,
> -John
>
> On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
> jonathansantilli@gmail.com> wrote:
>
>> BTW, after stopping the app gracefully (Stream#close()), this error shows
>> up repeatedly:
>>
>> 2019-03-01 17:18:07,819 WARN
>> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
>> [0_0] Failed to write offset checkpoint file to
>> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
>>
>> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
>> (No such file or directory)
>>
>> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
>>
>> at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]
>>
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
>> ~[?:1.8.0_191]
>>
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
>> ~[?:1.8.0_191]
>>
>> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
>> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
>>
>> at
>>
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
>> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
>> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.StreamTask.close(
>> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
>> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
>> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
>>
>> at
>>
>> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
>> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
>>
>> at org.apache.kafka.streams.processor.internals.StreamThread.run(
>> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
>>
>>
>> However, I have checked and the folder created starts with: *1_*
>>
>> ls -lha /tmp/kafka-stream/XXX/1_1
>> total 8
>> drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
>> drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
>> -rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
>> -rw-r--r--   1 a  b     0B  1 Mar 16:05 .lock
>> drwxr-xr-x   3 a  b    96B  1 Mar 16:43
>> KSTREAM-REDUCE-STATE-STORE-0000000005
>>
>>
>>
>> Cheers!
>> --
>> Jonathan
>>
>>
>>
>> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
>> jonathansantilli@gmail.com>
>> wrote:
>>
>> > Hello John, hope you are well.
>> > I have tested the version 2.2 release candidate (although I know it has
>> > been postponed).
>> > I have been following this email thread because I think am experiencing
>> > the same issue. I have reported in an email to this list and also all
>> the
>> > details are in OS (
>> >
>> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
>> > ).
>> >
>> > After the test, the result is the same as before (at least for my case),
>> > already processed records are passed again to the output topic causing
>> the
>> > data duplication:
>> >
>> > ...
>> > 2019-03-01 16:55:23,808 INFO
>> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
>> > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> No
>> > checkpoint found for task 1_10 state store
>> > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
>> > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS turned
>> on. *Reinitializing
>> > the task and restore its state from the beginning.*
>> >
>> > ...
>> >
>> >
>> > I was hoping for this to be fixed, but is not the case, at least for my
>> > case.
>> >
>> > If you can, please take a look at the question in SO, I was in contact
>> > with Matthias about it, he also points me the place where probably the
>> > potential but could be present.
>> >
>> > Please, let me know any thoughts.
>> >
>> >
>> > Cheers!
>> > --
>> > Jonathan
>> >
>> >
>> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:
>> >
>> >> Hi again, Peter,
>> >>
>> >> Just to close the loop about the bug in Suppress, we did get the
>> >> (apparent)
>> >> same report from a few other people:
>> >> https://issues.apache.org/jira/browse/KAFKA-7895
>> >>
>> >> I also managed to reproduce the duplicate-result behavior, which could
>> >> cause it to emit both intermediate results and duplicate final results.
>> >>
>> >> There's a patch for it in the 2.2 release candidate. Perhaps you can
>> try
>> >> it
>> >> out and see if it resolves the issue for you?
>> >>
>> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed the
>> >> last
>> >> 2.1 bugfix release.
>> >>
>> >> Thanks,
>> >> -John
>> >>
>> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io>
>> wrote:
>> >>
>> >> > Hi Peter,
>> >> >
>> >> > Thanks for the replies.
>> >> >
>> >> > Regarding transactions:
>> >> > Yes, actually, with EOS enabled, the changelog and the output topics
>> are
>> >> > all produced with the same transactional producer, within the same
>> >> > transactions. So it should already be atomic.
>> >> >
>> >> > Regarding restore:
>> >> > Streams doesn't put the store into service until the restore is
>> >> completed,
>> >> > so it should be guaranteed not to happen. But there's of course no
>> >> > guarantee that I didn't mess something up. I'll take a hard look at
>> it.
>> >> >
>> >> > Regarding restoration and offsets:
>> >> > Your guess is correct: Streams tracks the latest stored offset
>> outside
>> >> of
>> >> > the store implementation itself, specifically by writing a file
>> (called
>> >> a
>> >> > Checkpoint File) in the state directory. If the file is there, it
>> reads
>> >> > that offset and restores from that point. If the file is missing, it
>> >> > restores from the beginning of the stream. So it should "just work"
>> for
>> >> > you. Just for completeness, there have been several edge cases
>> >> discovered
>> >> > where this mechanism isn't completely safe, so in the case of EOS, I
>> >> > believe we actually disregard that checkpoint file and the prior
>> state
>> >> and
>> >> > always rebuild from the earliest offset in the changelog.
>> >> >
>> >> > Personally, I would like to see us provide the ability to store the
>> >> > checkpoint inside the state store, so that checkpoint updates are
>> >> > linearized correctly w.r.t. data updates, but I actually haven't
>> >> mentioned
>> >> > this thought to anyone until now ;)
>> >> >
>> >> > Finally, regarding your prior email:
>> >> > Yes, I was thinking that the "wrong" output values might be part of
>> >> > rolled-back transactions and therefore enabling read-committed mode
>> on
>> >> the
>> >> > consumer might tell a different story that what you've seen to date.
>> >> >
>> >> > I'm honestly still baffled about those intermediate results that are
>> >> > sneaking out. I wonder if it's something specific to your data
>> stream,
>> >> like
>> >> > maybe if there is maybe an edge case when two records have exactly
>> the
>> >> same
>> >> > timestamp? I'll have to stare at the code some more...
>> >> >
>> >> > Regardless, in order to reap the benefits of running the app with
>> EOS,
>> >> you
>> >> > really have to also set your consumers to read_committed. Otherwise,
>> >> you'll
>> >> > be seeing output data from aborted (aka rolled-back) transactions,
>> and
>> >> you
>> >> > miss the intended "exactly once" guarantee.
>> >> >
>> >> > Thanks,
>> >> > -John
>> >> >
>> >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <peter.levart@gmail.com
>> >
>> >> > wrote:
>> >> >
>> >> >> Hi John,
>> >> >>
>> >> >> Haven't been able to reinstate the demo yet, but I have been
>> re-reading
>> >> >> the following scenario of yours....
>> >> >>
>> >> >> On 1/24/19 11:48 PM, Peter Levart wrote:
>> >> >> > Hi John,
>> >> >> >
>> >> >> > On 1/24/19 3:18 PM, John Roesler wrote:
>> >> >> >
>> >> >> >>
>> >> >> >> The reason is that, upon restart, the suppression buffer can only
>> >> >> >> "remember" what got sent & committed to its changelog topic
>> before.
>> >> >> >>
>> >> >> >> The scenario I have in mind is:
>> >> >> >>
>> >> >> >> ...
>> >> >> >> * buffer state X
>> >> >> >> ...
>> >> >> >> * flush state X to buffer changelog
>> >> >> >> ...
>> >> >> >> * commit transaction T0; start new transaction T1
>> >> >> >> ...
>> >> >> >> * emit final result X (in uncommitted transaction T1)
>> >> >> >> ...
>> >> >> >> * crash before flushing to the changelog the fact that state X
>> was
>> >> >> >> emitted.
>> >> >> >> Also, transaction T1 gets aborted, since we crash before
>> committing.
>> >> >> >> ...
>> >> >> >> * restart, restoring state X again from the changelog (because
>> the
>> >> emit
>> >> >> >> didn't get committed)
>> >> >> >> * start transaction T2
>> >> >> >> * emit final result X again (in uncommitted transaction T2)
>> >> >> >> ...
>> >> >> >> * commit transaction T2
>> >> >> >> ...
>> >> >> >>
>> >> >> >> So, the result gets emitted twice, but the first time is in an
>> >> aborted
>> >> >> >> transaction. This leads me to another clarifying question:
>> >> >> >>
>> >> >> >> Based on your first message, it seems like the duplicates you
>> >> observe
>> >> >> >> are
>> >> >> >> in the output topic. When you read the topic, do you configure
>> your
>> >> >> >> consumer with "read committed" mode? If not, you'll see "results"
>> >> from
>> >> >> >> uncommitted transactions, which could explain the duplicates.
>> >> >>
>> >> >> ...and I was thinking that perhaps the right solution to the
>> >> suppression
>> >> >> problem would be to use transactional producers for the resulting
>> >> output
>> >> >> topic AND the store change-log. Is this possible? Does the
>> compaction
>> >> of
>> >> >> the log on the brokers work for transactional producers as
>> expected? In
>> >> >> that case, the sending of final result and the marking of that fact
>> in
>> >> >> the store change log would together be an atomic operation.
>> >> >> That said, I think there's another problem with suppression which
>> looks
>> >> >> like the supression processor is already processing the input while
>> the
>> >> >> state store has not been fully restored yet or something related...
>> Is
>> >> >> this guaranteed not to happen?
>> >> >>
>> >> >> And now something unrelated I wanted to ask...
>> >> >>
>> >> >> I'm trying to create my own custom state store. From the API I can
>> see
>> >> >> it is pretty straightforward. One thing that I don't quite
>> understand
>> >> is
>> >> >> how Kafka Streams know whether to replay the whole change log after
>> the
>> >> >> store registers itself or just a part of it and which part (from
>> which
>> >> >> offset per partition). There doesn't seem to be any API point
>> through
>> >> >> which the store could communicate this information back to Kafka
>> >> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
>> >> >> Streams first invoke flush() on the store and then notes down the
>> >> >> offsets from the change log producer somewhere? So next time the
>> store
>> >> >> is brought up, the log is only replayed from last noted down
>> offset? So
>> >> >> it can happen that the store gets some log entries that have already
>> >> >> been incorporated in it (from the point of one flush before) but
>> never
>> >> >> misses any... In any case there has to be an indication somewhere
>> that
>> >> >> the store didn't survive and has to be rebuilt from scratch. How do
>> >> >> Kafka Streams detect that situation? By placing some marker file
>> into
>> >> >> the directory reserved for store's local storage?
>> >> >>
>> >> >> Regards, Peter
>> >> >>
>> >> >>
>> >>
>> >
>> >
>> > --
>> > Santilli Jonathan
>> >
>>
>>
>> --
>> Santilli Jonathan
>>
>

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by John Roesler <jo...@confluent.io>.
Hi Jonathan,

Sorry to hear that the feature is causing you trouble as well, and that the
2.2 release candidate didn't seem to fix it.

I'll try and do a repro based on the code in your SO post tomorrow.

I don't think it's related to the duplicates, but that shutdown error is
puzzling. Can you print the topology (with topology.describe() ) ? This
will tell us what is in task 1 (i.e., *1_*) of your program.

Thanks,
-John

On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
jonathansantilli@gmail.com> wrote:

> BTW, after stopping the app gracefully (Stream#close()), this error shows
> up repeatedly:
>
> 2019-03-01 17:18:07,819 WARN
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
> [0_0] Failed to write offset checkpoint file to
> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
>
> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
> (No such file or directory)
>
> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
>
> at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]
>
> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> ~[?:1.8.0_191]
>
> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> ~[?:1.8.0_191]
>
> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
>
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.StreamTask.close(
> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
>
> at org.apache.kafka.streams.processor.internals.StreamThread.run(
> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
>
>
> However, I have checked and the folder created starts with: *1_*
>
> ls -lha /tmp/kafka-stream/XXX/1_1
> total 8
> drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
> drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
> -rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
> -rw-r--r--   1 a  b     0B  1 Mar 16:05 .lock
> drwxr-xr-x   3 a  b    96B  1 Mar 16:43
> KSTREAM-REDUCE-STATE-STORE-0000000005
>
>
>
> Cheers!
> --
> Jonathan
>
>
>
> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
> jonathansantilli@gmail.com>
> wrote:
>
> > Hello John, hope you are well.
> > I have tested the version 2.2 release candidate (although I know it has
> > been postponed).
> > I have been following this email thread because I think am experiencing
> > the same issue. I have reported in an email to this list and also all the
> > details are in OS (
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> > ).
> >
> > After the test, the result is the same as before (at least for my case),
> > already processed records are passed again to the output topic causing
> the
> > data duplication:
> >
> > ...
> > 2019-03-01 16:55:23,808 INFO
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> No
> > checkpoint found for task 1_10 state store
> > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS turned
> on. *Reinitializing
> > the task and restore its state from the beginning.*
> >
> > ...
> >
> >
> > I was hoping for this to be fixed, but is not the case, at least for my
> > case.
> >
> > If you can, please take a look at the question in SO, I was in contact
> > with Matthias about it, he also points me the place where probably the
> > potential but could be present.
> >
> > Please, let me know any thoughts.
> >
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:
> >
> >> Hi again, Peter,
> >>
> >> Just to close the loop about the bug in Suppress, we did get the
> >> (apparent)
> >> same report from a few other people:
> >> https://issues.apache.org/jira/browse/KAFKA-7895
> >>
> >> I also managed to reproduce the duplicate-result behavior, which could
> >> cause it to emit both intermediate results and duplicate final results.
> >>
> >> There's a patch for it in the 2.2 release candidate. Perhaps you can try
> >> it
> >> out and see if it resolves the issue for you?
> >>
> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed the
> >> last
> >> 2.1 bugfix release.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io>
> wrote:
> >>
> >> > Hi Peter,
> >> >
> >> > Thanks for the replies.
> >> >
> >> > Regarding transactions:
> >> > Yes, actually, with EOS enabled, the changelog and the output topics
> are
> >> > all produced with the same transactional producer, within the same
> >> > transactions. So it should already be atomic.
> >> >
> >> > Regarding restore:
> >> > Streams doesn't put the store into service until the restore is
> >> completed,
> >> > so it should be guaranteed not to happen. But there's of course no
> >> > guarantee that I didn't mess something up. I'll take a hard look at
> it.
> >> >
> >> > Regarding restoration and offsets:
> >> > Your guess is correct: Streams tracks the latest stored offset outside
> >> of
> >> > the store implementation itself, specifically by writing a file
> (called
> >> a
> >> > Checkpoint File) in the state directory. If the file is there, it
> reads
> >> > that offset and restores from that point. If the file is missing, it
> >> > restores from the beginning of the stream. So it should "just work"
> for
> >> > you. Just for completeness, there have been several edge cases
> >> discovered
> >> > where this mechanism isn't completely safe, so in the case of EOS, I
> >> > believe we actually disregard that checkpoint file and the prior state
> >> and
> >> > always rebuild from the earliest offset in the changelog.
> >> >
> >> > Personally, I would like to see us provide the ability to store the
> >> > checkpoint inside the state store, so that checkpoint updates are
> >> > linearized correctly w.r.t. data updates, but I actually haven't
> >> mentioned
> >> > this thought to anyone until now ;)
> >> >
> >> > Finally, regarding your prior email:
> >> > Yes, I was thinking that the "wrong" output values might be part of
> >> > rolled-back transactions and therefore enabling read-committed mode on
> >> the
> >> > consumer might tell a different story that what you've seen to date.
> >> >
> >> > I'm honestly still baffled about those intermediate results that are
> >> > sneaking out. I wonder if it's something specific to your data stream,
> >> like
> >> > maybe if there is maybe an edge case when two records have exactly the
> >> same
> >> > timestamp? I'll have to stare at the code some more...
> >> >
> >> > Regardless, in order to reap the benefits of running the app with EOS,
> >> you
> >> > really have to also set your consumers to read_committed. Otherwise,
> >> you'll
> >> > be seeing output data from aborted (aka rolled-back) transactions, and
> >> you
> >> > miss the intended "exactly once" guarantee.
> >> >
> >> > Thanks,
> >> > -John
> >> >
> >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com>
> >> > wrote:
> >> >
> >> >> Hi John,
> >> >>
> >> >> Haven't been able to reinstate the demo yet, but I have been
> re-reading
> >> >> the following scenario of yours....
> >> >>
> >> >> On 1/24/19 11:48 PM, Peter Levart wrote:
> >> >> > Hi John,
> >> >> >
> >> >> > On 1/24/19 3:18 PM, John Roesler wrote:
> >> >> >
> >> >> >>
> >> >> >> The reason is that, upon restart, the suppression buffer can only
> >> >> >> "remember" what got sent & committed to its changelog topic
> before.
> >> >> >>
> >> >> >> The scenario I have in mind is:
> >> >> >>
> >> >> >> ...
> >> >> >> * buffer state X
> >> >> >> ...
> >> >> >> * flush state X to buffer changelog
> >> >> >> ...
> >> >> >> * commit transaction T0; start new transaction T1
> >> >> >> ...
> >> >> >> * emit final result X (in uncommitted transaction T1)
> >> >> >> ...
> >> >> >> * crash before flushing to the changelog the fact that state X was
> >> >> >> emitted.
> >> >> >> Also, transaction T1 gets aborted, since we crash before
> committing.
> >> >> >> ...
> >> >> >> * restart, restoring state X again from the changelog (because the
> >> emit
> >> >> >> didn't get committed)
> >> >> >> * start transaction T2
> >> >> >> * emit final result X again (in uncommitted transaction T2)
> >> >> >> ...
> >> >> >> * commit transaction T2
> >> >> >> ...
> >> >> >>
> >> >> >> So, the result gets emitted twice, but the first time is in an
> >> aborted
> >> >> >> transaction. This leads me to another clarifying question:
> >> >> >>
> >> >> >> Based on your first message, it seems like the duplicates you
> >> observe
> >> >> >> are
> >> >> >> in the output topic. When you read the topic, do you configure
> your
> >> >> >> consumer with "read committed" mode? If not, you'll see "results"
> >> from
> >> >> >> uncommitted transactions, which could explain the duplicates.
> >> >>
> >> >> ...and I was thinking that perhaps the right solution to the
> >> suppression
> >> >> problem would be to use transactional producers for the resulting
> >> output
> >> >> topic AND the store change-log. Is this possible? Does the compaction
> >> of
> >> >> the log on the brokers work for transactional producers as expected?
> In
> >> >> that case, the sending of final result and the marking of that fact
> in
> >> >> the store change log would together be an atomic operation.
> >> >> That said, I think there's another problem with suppression which
> looks
> >> >> like the supression processor is already processing the input while
> the
> >> >> state store has not been fully restored yet or something related...
> Is
> >> >> this guaranteed not to happen?
> >> >>
> >> >> And now something unrelated I wanted to ask...
> >> >>
> >> >> I'm trying to create my own custom state store. From the API I can
> see
> >> >> it is pretty straightforward. One thing that I don't quite understand
> >> is
> >> >> how Kafka Streams know whether to replay the whole change log after
> the
> >> >> store registers itself or just a part of it and which part (from
> which
> >> >> offset per partition). There doesn't seem to be any API point through
> >> >> which the store could communicate this information back to Kafka
> >> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
> >> >> Streams first invoke flush() on the store and then notes down the
> >> >> offsets from the change log producer somewhere? So next time the
> store
> >> >> is brought up, the log is only replayed from last noted down offset?
> So
> >> >> it can happen that the store gets some log entries that have already
> >> >> been incorporated in it (from the point of one flush before) but
> never
> >> >> misses any... In any case there has to be an indication somewhere
> that
> >> >> the store didn't survive and has to be rebuilt from scratch. How do
> >> >> Kafka Streams detect that situation? By placing some marker file into
> >> >> the directory reserved for store's local storage?
> >> >>
> >> >> Regards, Peter
> >> >>
> >> >>
> >>
> >
> >
> > --
> > Santilli Jonathan
> >
>
>
> --
> Santilli Jonathan
>

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by Jonathan Santilli <jo...@gmail.com>.
BTW, after stopping the app gracefully (Stream#close()), this error shows
up repeatedly:

2019-03-01 17:18:07,819 WARN
[XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
[0_0] Failed to write offset checkpoint file to
[/tmp/kafka-stream/XXX/0_0/.checkpoint]

java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
(No such file or directory)

at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]

at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]

at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[?:1.8.0_191]

at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[?:1.8.0_191]

at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
StreamTask.java:599) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamTask.close(
StreamTask.java:721) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
TaskManager.java:267) [kafka-streams-2.2.0.jar:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamThread.run(
StreamThread.java:786) [kafka-streams-2.2.0.jar:?]


However, I have checked and the folder created starts with: *1_*

ls -lha /tmp/kafka-stream/XXX/1_1
total 8
drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
-rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
-rw-r--r--   1 a  b     0B  1 Mar 16:05 .lock
drwxr-xr-x   3 a  b    96B  1 Mar 16:43
KSTREAM-REDUCE-STATE-STORE-0000000005



Cheers!
--
Jonathan



On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <jo...@gmail.com>
wrote:

> Hello John, hope you are well.
> I have tested the version 2.2 release candidate (although I know it has
> been postponed).
> I have been following this email thread because I think am experiencing
> the same issue. I have reported in an email to this list and also all the
> details are in OS (
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> ).
>
> After the test, the result is the same as before (at least for my case),
> already processed records are passed again to the output topic causing the
> data duplication:
>
> ...
> 2019-03-01 16:55:23,808 INFO  [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No
> checkpoint found for task 1_10 state store
> KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS turned on. *Reinitializing
> the task and restore its state from the beginning.*
>
> ...
>
>
> I was hoping for this to be fixed, but is not the case, at least for my
> case.
>
> If you can, please take a look at the question in SO, I was in contact
> with Matthias about it, he also points me the place where probably the
> potential but could be present.
>
> Please, let me know any thoughts.
>
>
> Cheers!
> --
> Jonathan
>
>
> On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:
>
>> Hi again, Peter,
>>
>> Just to close the loop about the bug in Suppress, we did get the
>> (apparent)
>> same report from a few other people:
>> https://issues.apache.org/jira/browse/KAFKA-7895
>>
>> I also managed to reproduce the duplicate-result behavior, which could
>> cause it to emit both intermediate results and duplicate final results.
>>
>> There's a patch for it in the 2.2 release candidate. Perhaps you can try
>> it
>> out and see if it resolves the issue for you?
>>
>> I'm backporting the fix to 2.1 as well, but I unfortunately missed the
>> last
>> 2.1 bugfix release.
>>
>> Thanks,
>> -John
>>
>> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io> wrote:
>>
>> > Hi Peter,
>> >
>> > Thanks for the replies.
>> >
>> > Regarding transactions:
>> > Yes, actually, with EOS enabled, the changelog and the output topics are
>> > all produced with the same transactional producer, within the same
>> > transactions. So it should already be atomic.
>> >
>> > Regarding restore:
>> > Streams doesn't put the store into service until the restore is
>> completed,
>> > so it should be guaranteed not to happen. But there's of course no
>> > guarantee that I didn't mess something up. I'll take a hard look at it.
>> >
>> > Regarding restoration and offsets:
>> > Your guess is correct: Streams tracks the latest stored offset outside
>> of
>> > the store implementation itself, specifically by writing a file (called
>> a
>> > Checkpoint File) in the state directory. If the file is there, it reads
>> > that offset and restores from that point. If the file is missing, it
>> > restores from the beginning of the stream. So it should "just work" for
>> > you. Just for completeness, there have been several edge cases
>> discovered
>> > where this mechanism isn't completely safe, so in the case of EOS, I
>> > believe we actually disregard that checkpoint file and the prior state
>> and
>> > always rebuild from the earliest offset in the changelog.
>> >
>> > Personally, I would like to see us provide the ability to store the
>> > checkpoint inside the state store, so that checkpoint updates are
>> > linearized correctly w.r.t. data updates, but I actually haven't
>> mentioned
>> > this thought to anyone until now ;)
>> >
>> > Finally, regarding your prior email:
>> > Yes, I was thinking that the "wrong" output values might be part of
>> > rolled-back transactions and therefore enabling read-committed mode on
>> the
>> > consumer might tell a different story that what you've seen to date.
>> >
>> > I'm honestly still baffled about those intermediate results that are
>> > sneaking out. I wonder if it's something specific to your data stream,
>> like
>> > maybe if there is maybe an edge case when two records have exactly the
>> same
>> > timestamp? I'll have to stare at the code some more...
>> >
>> > Regardless, in order to reap the benefits of running the app with EOS,
>> you
>> > really have to also set your consumers to read_committed. Otherwise,
>> you'll
>> > be seeing output data from aborted (aka rolled-back) transactions, and
>> you
>> > miss the intended "exactly once" guarantee.
>> >
>> > Thanks,
>> > -John
>> >
>> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com>
>> > wrote:
>> >
>> >> Hi John,
>> >>
>> >> Haven't been able to reinstate the demo yet, but I have been re-reading
>> >> the following scenario of yours....
>> >>
>> >> On 1/24/19 11:48 PM, Peter Levart wrote:
>> >> > Hi John,
>> >> >
>> >> > On 1/24/19 3:18 PM, John Roesler wrote:
>> >> >
>> >> >>
>> >> >> The reason is that, upon restart, the suppression buffer can only
>> >> >> "remember" what got sent & committed to its changelog topic before.
>> >> >>
>> >> >> The scenario I have in mind is:
>> >> >>
>> >> >> ...
>> >> >> * buffer state X
>> >> >> ...
>> >> >> * flush state X to buffer changelog
>> >> >> ...
>> >> >> * commit transaction T0; start new transaction T1
>> >> >> ...
>> >> >> * emit final result X (in uncommitted transaction T1)
>> >> >> ...
>> >> >> * crash before flushing to the changelog the fact that state X was
>> >> >> emitted.
>> >> >> Also, transaction T1 gets aborted, since we crash before committing.
>> >> >> ...
>> >> >> * restart, restoring state X again from the changelog (because the
>> emit
>> >> >> didn't get committed)
>> >> >> * start transaction T2
>> >> >> * emit final result X again (in uncommitted transaction T2)
>> >> >> ...
>> >> >> * commit transaction T2
>> >> >> ...
>> >> >>
>> >> >> So, the result gets emitted twice, but the first time is in an
>> aborted
>> >> >> transaction. This leads me to another clarifying question:
>> >> >>
>> >> >> Based on your first message, it seems like the duplicates you
>> observe
>> >> >> are
>> >> >> in the output topic. When you read the topic, do you configure your
>> >> >> consumer with "read committed" mode? If not, you'll see "results"
>> from
>> >> >> uncommitted transactions, which could explain the duplicates.
>> >>
>> >> ...and I was thinking that perhaps the right solution to the
>> suppression
>> >> problem would be to use transactional producers for the resulting
>> output
>> >> topic AND the store change-log. Is this possible? Does the compaction
>> of
>> >> the log on the brokers work for transactional producers as expected? In
>> >> that case, the sending of final result and the marking of that fact in
>> >> the store change log would together be an atomic operation.
>> >> That said, I think there's another problem with suppression which looks
>> >> like the supression processor is already processing the input while the
>> >> state store has not been fully restored yet or something related... Is
>> >> this guaranteed not to happen?
>> >>
>> >> And now something unrelated I wanted to ask...
>> >>
>> >> I'm trying to create my own custom state store. From the API I can see
>> >> it is pretty straightforward. One thing that I don't quite understand
>> is
>> >> how Kafka Streams know whether to replay the whole change log after the
>> >> store registers itself or just a part of it and which part (from which
>> >> offset per partition). There doesn't seem to be any API point through
>> >> which the store could communicate this information back to Kafka
>> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
>> >> Streams first invoke flush() on the store and then notes down the
>> >> offsets from the change log producer somewhere? So next time the store
>> >> is brought up, the log is only replayed from last noted down offset? So
>> >> it can happen that the store gets some log entries that have already
>> >> been incorporated in it (from the point of one flush before) but never
>> >> misses any... In any case there has to be an indication somewhere that
>> >> the store didn't survive and has to be rebuilt from scratch. How do
>> >> Kafka Streams detect that situation? By placing some marker file into
>> >> the directory reserved for store's local storage?
>> >>
>> >> Regards, Peter
>> >>
>> >>
>>
>
>
> --
> Santilli Jonathan
>


-- 
Santilli Jonathan

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by Jonathan Santilli <jo...@gmail.com>.
Hello John, hope you are well.
I have tested the version 2.2 release candidate (although I know it has
been postponed).
I have been following this email thread because I think am experiencing the
same issue. I have reported in an email to this list and also all the
details are in OS (
https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
).

After the test, the result is the same as before (at least for my case),
already processed records are passed again to the output topic causing the
data duplication:

...
2019-03-01 16:55:23,808 INFO
[XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
internals.StoreChangelogReader (StoreChangelogReader.java:221) -
stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No
checkpoint found for task 1_10 state store
KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS
turned on. *Reinitializing
the task and restore its state from the beginning.*

...


I was hoping for this to be fixed, but is not the case, at least for my
case.

If you can, please take a look at the question in SO, I was in contact with
Matthias about it, he also points me the place where probably the
potential but could be present.

Please, let me know any thoughts.


Cheers!
--
Jonathan


On Tue, Feb 26, 2019 at 5:23 PM John Roesler <jo...@confluent.io> wrote:

> Hi again, Peter,
>
> Just to close the loop about the bug in Suppress, we did get the (apparent)
> same report from a few other people:
> https://issues.apache.org/jira/browse/KAFKA-7895
>
> I also managed to reproduce the duplicate-result behavior, which could
> cause it to emit both intermediate results and duplicate final results.
>
> There's a patch for it in the 2.2 release candidate. Perhaps you can try it
> out and see if it resolves the issue for you?
>
> I'm backporting the fix to 2.1 as well, but I unfortunately missed the last
> 2.1 bugfix release.
>
> Thanks,
> -John
>
> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io> wrote:
>
> > Hi Peter,
> >
> > Thanks for the replies.
> >
> > Regarding transactions:
> > Yes, actually, with EOS enabled, the changelog and the output topics are
> > all produced with the same transactional producer, within the same
> > transactions. So it should already be atomic.
> >
> > Regarding restore:
> > Streams doesn't put the store into service until the restore is
> completed,
> > so it should be guaranteed not to happen. But there's of course no
> > guarantee that I didn't mess something up. I'll take a hard look at it.
> >
> > Regarding restoration and offsets:
> > Your guess is correct: Streams tracks the latest stored offset outside of
> > the store implementation itself, specifically by writing a file (called a
> > Checkpoint File) in the state directory. If the file is there, it reads
> > that offset and restores from that point. If the file is missing, it
> > restores from the beginning of the stream. So it should "just work" for
> > you. Just for completeness, there have been several edge cases discovered
> > where this mechanism isn't completely safe, so in the case of EOS, I
> > believe we actually disregard that checkpoint file and the prior state
> and
> > always rebuild from the earliest offset in the changelog.
> >
> > Personally, I would like to see us provide the ability to store the
> > checkpoint inside the state store, so that checkpoint updates are
> > linearized correctly w.r.t. data updates, but I actually haven't
> mentioned
> > this thought to anyone until now ;)
> >
> > Finally, regarding your prior email:
> > Yes, I was thinking that the "wrong" output values might be part of
> > rolled-back transactions and therefore enabling read-committed mode on
> the
> > consumer might tell a different story that what you've seen to date.
> >
> > I'm honestly still baffled about those intermediate results that are
> > sneaking out. I wonder if it's something specific to your data stream,
> like
> > maybe if there is maybe an edge case when two records have exactly the
> same
> > timestamp? I'll have to stare at the code some more...
> >
> > Regardless, in order to reap the benefits of running the app with EOS,
> you
> > really have to also set your consumers to read_committed. Otherwise,
> you'll
> > be seeing output data from aborted (aka rolled-back) transactions, and
> you
> > miss the intended "exactly once" guarantee.
> >
> > Thanks,
> > -John
> >
> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com>
> > wrote:
> >
> >> Hi John,
> >>
> >> Haven't been able to reinstate the demo yet, but I have been re-reading
> >> the following scenario of yours....
> >>
> >> On 1/24/19 11:48 PM, Peter Levart wrote:
> >> > Hi John,
> >> >
> >> > On 1/24/19 3:18 PM, John Roesler wrote:
> >> >
> >> >>
> >> >> The reason is that, upon restart, the suppression buffer can only
> >> >> "remember" what got sent & committed to its changelog topic before.
> >> >>
> >> >> The scenario I have in mind is:
> >> >>
> >> >> ...
> >> >> * buffer state X
> >> >> ...
> >> >> * flush state X to buffer changelog
> >> >> ...
> >> >> * commit transaction T0; start new transaction T1
> >> >> ...
> >> >> * emit final result X (in uncommitted transaction T1)
> >> >> ...
> >> >> * crash before flushing to the changelog the fact that state X was
> >> >> emitted.
> >> >> Also, transaction T1 gets aborted, since we crash before committing.
> >> >> ...
> >> >> * restart, restoring state X again from the changelog (because the
> emit
> >> >> didn't get committed)
> >> >> * start transaction T2
> >> >> * emit final result X again (in uncommitted transaction T2)
> >> >> ...
> >> >> * commit transaction T2
> >> >> ...
> >> >>
> >> >> So, the result gets emitted twice, but the first time is in an
> aborted
> >> >> transaction. This leads me to another clarifying question:
> >> >>
> >> >> Based on your first message, it seems like the duplicates you observe
> >> >> are
> >> >> in the output topic. When you read the topic, do you configure your
> >> >> consumer with "read committed" mode? If not, you'll see "results"
> from
> >> >> uncommitted transactions, which could explain the duplicates.
> >>
> >> ...and I was thinking that perhaps the right solution to the suppression
> >> problem would be to use transactional producers for the resulting output
> >> topic AND the store change-log. Is this possible? Does the compaction of
> >> the log on the brokers work for transactional producers as expected? In
> >> that case, the sending of final result and the marking of that fact in
> >> the store change log would together be an atomic operation.
> >> That said, I think there's another problem with suppression which looks
> >> like the supression processor is already processing the input while the
> >> state store has not been fully restored yet or something related... Is
> >> this guaranteed not to happen?
> >>
> >> And now something unrelated I wanted to ask...
> >>
> >> I'm trying to create my own custom state store. From the API I can see
> >> it is pretty straightforward. One thing that I don't quite understand is
> >> how Kafka Streams know whether to replay the whole change log after the
> >> store registers itself or just a part of it and which part (from which
> >> offset per partition). There doesn't seem to be any API point through
> >> which the store could communicate this information back to Kafka
> >> Streams. Is such bookkeeping performed outside the store? Does Kafka
> >> Streams first invoke flush() on the store and then notes down the
> >> offsets from the change log producer somewhere? So next time the store
> >> is brought up, the log is only replayed from last noted down offset? So
> >> it can happen that the store gets some log entries that have already
> >> been incorporated in it (from the point of one flush before) but never
> >> misses any... In any case there has to be an indication somewhere that
> >> the store didn't survive and has to be rebuilt from scratch. How do
> >> Kafka Streams detect that situation? By placing some marker file into
> >> the directory reserved for store's local storage?
> >>
> >> Regards, Peter
> >>
> >>
>


-- 
Santilli Jonathan

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by John Roesler <jo...@confluent.io>.
Hi again, Peter,

Just to close the loop about the bug in Suppress, we did get the (apparent)
same report from a few other people:
https://issues.apache.org/jira/browse/KAFKA-7895

I also managed to reproduce the duplicate-result behavior, which could
cause it to emit both intermediate results and duplicate final results.

There's a patch for it in the 2.2 release candidate. Perhaps you can try it
out and see if it resolves the issue for you?

I'm backporting the fix to 2.1 as well, but I unfortunately missed the last
2.1 bugfix release.

Thanks,
-John

On Fri, Jan 25, 2019 at 10:23 AM John Roesler <jo...@confluent.io> wrote:

> Hi Peter,
>
> Thanks for the replies.
>
> Regarding transactions:
> Yes, actually, with EOS enabled, the changelog and the output topics are
> all produced with the same transactional producer, within the same
> transactions. So it should already be atomic.
>
> Regarding restore:
> Streams doesn't put the store into service until the restore is completed,
> so it should be guaranteed not to happen. But there's of course no
> guarantee that I didn't mess something up. I'll take a hard look at it.
>
> Regarding restoration and offsets:
> Your guess is correct: Streams tracks the latest stored offset outside of
> the store implementation itself, specifically by writing a file (called a
> Checkpoint File) in the state directory. If the file is there, it reads
> that offset and restores from that point. If the file is missing, it
> restores from the beginning of the stream. So it should "just work" for
> you. Just for completeness, there have been several edge cases discovered
> where this mechanism isn't completely safe, so in the case of EOS, I
> believe we actually disregard that checkpoint file and the prior state and
> always rebuild from the earliest offset in the changelog.
>
> Personally, I would like to see us provide the ability to store the
> checkpoint inside the state store, so that checkpoint updates are
> linearized correctly w.r.t. data updates, but I actually haven't mentioned
> this thought to anyone until now ;)
>
> Finally, regarding your prior email:
> Yes, I was thinking that the "wrong" output values might be part of
> rolled-back transactions and therefore enabling read-committed mode on the
> consumer might tell a different story that what you've seen to date.
>
> I'm honestly still baffled about those intermediate results that are
> sneaking out. I wonder if it's something specific to your data stream, like
> maybe if there is maybe an edge case when two records have exactly the same
> timestamp? I'll have to stare at the code some more...
>
> Regardless, in order to reap the benefits of running the app with EOS, you
> really have to also set your consumers to read_committed. Otherwise, you'll
> be seeing output data from aborted (aka rolled-back) transactions, and you
> miss the intended "exactly once" guarantee.
>
> Thanks,
> -John
>
> On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com>
> wrote:
>
>> Hi John,
>>
>> Haven't been able to reinstate the demo yet, but I have been re-reading
>> the following scenario of yours....
>>
>> On 1/24/19 11:48 PM, Peter Levart wrote:
>> > Hi John,
>> >
>> > On 1/24/19 3:18 PM, John Roesler wrote:
>> >
>> >>
>> >> The reason is that, upon restart, the suppression buffer can only
>> >> "remember" what got sent & committed to its changelog topic before.
>> >>
>> >> The scenario I have in mind is:
>> >>
>> >> ...
>> >> * buffer state X
>> >> ...
>> >> * flush state X to buffer changelog
>> >> ...
>> >> * commit transaction T0; start new transaction T1
>> >> ...
>> >> * emit final result X (in uncommitted transaction T1)
>> >> ...
>> >> * crash before flushing to the changelog the fact that state X was
>> >> emitted.
>> >> Also, transaction T1 gets aborted, since we crash before committing.
>> >> ...
>> >> * restart, restoring state X again from the changelog (because the emit
>> >> didn't get committed)
>> >> * start transaction T2
>> >> * emit final result X again (in uncommitted transaction T2)
>> >> ...
>> >> * commit transaction T2
>> >> ...
>> >>
>> >> So, the result gets emitted twice, but the first time is in an aborted
>> >> transaction. This leads me to another clarifying question:
>> >>
>> >> Based on your first message, it seems like the duplicates you observe
>> >> are
>> >> in the output topic. When you read the topic, do you configure your
>> >> consumer with "read committed" mode? If not, you'll see "results" from
>> >> uncommitted transactions, which could explain the duplicates.
>>
>> ...and I was thinking that perhaps the right solution to the suppression
>> problem would be to use transactional producers for the resulting output
>> topic AND the store change-log. Is this possible? Does the compaction of
>> the log on the brokers work for transactional producers as expected? In
>> that case, the sending of final result and the marking of that fact in
>> the store change log would together be an atomic operation.
>> That said, I think there's another problem with suppression which looks
>> like the supression processor is already processing the input while the
>> state store has not been fully restored yet or something related... Is
>> this guaranteed not to happen?
>>
>> And now something unrelated I wanted to ask...
>>
>> I'm trying to create my own custom state store. From the API I can see
>> it is pretty straightforward. One thing that I don't quite understand is
>> how Kafka Streams know whether to replay the whole change log after the
>> store registers itself or just a part of it and which part (from which
>> offset per partition). There doesn't seem to be any API point through
>> which the store could communicate this information back to Kafka
>> Streams. Is such bookkeeping performed outside the store? Does Kafka
>> Streams first invoke flush() on the store and then notes down the
>> offsets from the change log producer somewhere? So next time the store
>> is brought up, the log is only replayed from last noted down offset? So
>> it can happen that the store gets some log entries that have already
>> been incorporated in it (from the point of one flush before) but never
>> misses any... In any case there has to be an indication somewhere that
>> the store didn't survive and has to be rebuilt from scratch. How do
>> Kafka Streams detect that situation? By placing some marker file into
>> the directory reserved for store's local storage?
>>
>> Regards, Peter
>>
>>

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by John Roesler <jo...@confluent.io>.
Hi Peter,

Thanks for the replies.

Regarding transactions:
Yes, actually, with EOS enabled, the changelog and the output topics are
all produced with the same transactional producer, within the same
transactions. So it should already be atomic.

Regarding restore:
Streams doesn't put the store into service until the restore is completed,
so it should be guaranteed not to happen. But there's of course no
guarantee that I didn't mess something up. I'll take a hard look at it.

Regarding restoration and offsets:
Your guess is correct: Streams tracks the latest stored offset outside of
the store implementation itself, specifically by writing a file (called a
Checkpoint File) in the state directory. If the file is there, it reads
that offset and restores from that point. If the file is missing, it
restores from the beginning of the stream. So it should "just work" for
you. Just for completeness, there have been several edge cases discovered
where this mechanism isn't completely safe, so in the case of EOS, I
believe we actually disregard that checkpoint file and the prior state and
always rebuild from the earliest offset in the changelog.

Personally, I would like to see us provide the ability to store the
checkpoint inside the state store, so that checkpoint updates are
linearized correctly w.r.t. data updates, but I actually haven't mentioned
this thought to anyone until now ;)

Finally, regarding your prior email:
Yes, I was thinking that the "wrong" output values might be part of
rolled-back transactions and therefore enabling read-committed mode on the
consumer might tell a different story that what you've seen to date.

I'm honestly still baffled about those intermediate results that are
sneaking out. I wonder if it's something specific to your data stream, like
maybe if there is maybe an edge case when two records have exactly the same
timestamp? I'll have to stare at the code some more...

Regardless, in order to reap the benefits of running the app with EOS, you
really have to also set your consumers to read_committed. Otherwise, you'll
be seeing output data from aborted (aka rolled-back) transactions, and you
miss the intended "exactly once" guarantee.

Thanks,
-John

On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <pe...@gmail.com> wrote:

> Hi John,
>
> Haven't been able to reinstate the demo yet, but I have been re-reading
> the following scenario of yours....
>
> On 1/24/19 11:48 PM, Peter Levart wrote:
> > Hi John,
> >
> > On 1/24/19 3:18 PM, John Roesler wrote:
> >
> >>
> >> The reason is that, upon restart, the suppression buffer can only
> >> "remember" what got sent & committed to its changelog topic before.
> >>
> >> The scenario I have in mind is:
> >>
> >> ...
> >> * buffer state X
> >> ...
> >> * flush state X to buffer changelog
> >> ...
> >> * commit transaction T0; start new transaction T1
> >> ...
> >> * emit final result X (in uncommitted transaction T1)
> >> ...
> >> * crash before flushing to the changelog the fact that state X was
> >> emitted.
> >> Also, transaction T1 gets aborted, since we crash before committing.
> >> ...
> >> * restart, restoring state X again from the changelog (because the emit
> >> didn't get committed)
> >> * start transaction T2
> >> * emit final result X again (in uncommitted transaction T2)
> >> ...
> >> * commit transaction T2
> >> ...
> >>
> >> So, the result gets emitted twice, but the first time is in an aborted
> >> transaction. This leads me to another clarifying question:
> >>
> >> Based on your first message, it seems like the duplicates you observe
> >> are
> >> in the output topic. When you read the topic, do you configure your
> >> consumer with "read committed" mode? If not, you'll see "results" from
> >> uncommitted transactions, which could explain the duplicates.
>
> ...and I was thinking that perhaps the right solution to the suppression
> problem would be to use transactional producers for the resulting output
> topic AND the store change-log. Is this possible? Does the compaction of
> the log on the brokers work for transactional producers as expected? In
> that case, the sending of final result and the marking of that fact in
> the store change log would together be an atomic operation.
> That said, I think there's another problem with suppression which looks
> like the supression processor is already processing the input while the
> state store has not been fully restored yet or something related... Is
> this guaranteed not to happen?
>
> And now something unrelated I wanted to ask...
>
> I'm trying to create my own custom state store. From the API I can see
> it is pretty straightforward. One thing that I don't quite understand is
> how Kafka Streams know whether to replay the whole change log after the
> store registers itself or just a part of it and which part (from which
> offset per partition). There doesn't seem to be any API point through
> which the store could communicate this information back to Kafka
> Streams. Is such bookkeeping performed outside the store? Does Kafka
> Streams first invoke flush() on the store and then notes down the
> offsets from the change log producer somewhere? So next time the store
> is brought up, the log is only replayed from last noted down offset? So
> it can happen that the store gets some log entries that have already
> been incorporated in it (from the point of one flush before) but never
> misses any... In any case there has to be an indication somewhere that
> the store didn't survive and has to be rebuilt from scratch. How do
> Kafka Streams detect that situation? By placing some marker file into
> the directory reserved for store's local storage?
>
> Regards, Peter
>
>

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by Peter Levart <pe...@gmail.com>.
Hi John,

Haven't been able to reinstate the demo yet, but I have been re-reading 
the following scenario of yours....

On 1/24/19 11:48 PM, Peter Levart wrote:
> Hi John,
>
> On 1/24/19 3:18 PM, John Roesler wrote:
>
>>
>> The reason is that, upon restart, the suppression buffer can only
>> "remember" what got sent & committed to its changelog topic before.
>>
>> The scenario I have in mind is:
>>
>> ...
>> * buffer state X
>> ...
>> * flush state X to buffer changelog
>> ...
>> * commit transaction T0; start new transaction T1
>> ...
>> * emit final result X (in uncommitted transaction T1)
>> ...
>> * crash before flushing to the changelog the fact that state X was 
>> emitted.
>> Also, transaction T1 gets aborted, since we crash before committing.
>> ...
>> * restart, restoring state X again from the changelog (because the emit
>> didn't get committed)
>> * start transaction T2
>> * emit final result X again (in uncommitted transaction T2)
>> ...
>> * commit transaction T2
>> ...
>>
>> So, the result gets emitted twice, but the first time is in an aborted
>> transaction. This leads me to another clarifying question:
>>
>> Based on your first message, it seems like the duplicates you observe 
>> are
>> in the output topic. When you read the topic, do you configure your
>> consumer with "read committed" mode? If not, you'll see "results" from
>> uncommitted transactions, which could explain the duplicates.

...and I was thinking that perhaps the right solution to the suppression 
problem would be to use transactional producers for the resulting output 
topic AND the store change-log. Is this possible? Does the compaction of 
the log on the brokers work for transactional producers as expected? In 
that case, the sending of final result and the marking of that fact in 
the store change log would together be an atomic operation.
That said, I think there's another problem with suppression which looks 
like the supression processor is already processing the input while the 
state store has not been fully restored yet or something related... Is 
this guaranteed not to happen?

And now something unrelated I wanted to ask...

I'm trying to create my own custom state store. From the API I can see 
it is pretty straightforward. One thing that I don't quite understand is 
how Kafka Streams know whether to replay the whole change log after the 
store registers itself or just a part of it and which part (from which 
offset per partition). There doesn't seem to be any API point through 
which the store could communicate this information back to Kafka 
Streams. Is such bookkeeping performed outside the store? Does Kafka 
Streams first invoke flush() on the store and then notes down the 
offsets from the change log producer somewhere? So next time the store 
is brought up, the log is only replayed from last noted down offset? So 
it can happen that the store gets some log entries that have already 
been incorporated in it (from the point of one flush before) but never 
misses any... In any case there has to be an indication somewhere that 
the store didn't survive and has to be rebuilt from scratch. How do 
Kafka Streams detect that situation? By placing some marker file into 
the directory reserved for store's local storage?

Regards, Peter


Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by Peter Levart <pe...@gmail.com>.
Hi John,

On 1/24/19 3:18 PM, John Roesler wrote:
> Hi Peter,
>
> Thanks for the clarification.
>
> When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't
> think that Streams automatically registers a shutdown hook. In our examples
> and demos, we register a shutdown hook "outside" of streams (right next to
> the code that calls start() ).
> Unless I missed something, a SIGTERM would still cause Streams to exit
> abruptly, skipping flush and commit. This can cause apparent duplicates *if
> you're not using EOS or if you're reading uncommitted transactions*.

The fact is that Spring which I use to instantiate the KafkaStreams 
object does that:

     @Bean(initMethod = "start", destroyMethod = "close")
     public KafkaStreams processorStreams(...

..so when JVM gets SIGTERM, the shutdown hook that Spring installs shuts 
down the ApplicationContext which calls all "destroyMethod"(s) on 
registered Bean(s)...

And the duplicates are less apparent but still occur even in EOS mode... 
But they are not actual duplicates. They are duplicate(s) only by 
windowed keys, the values are different...

>
> The reason is that, upon restart, the suppression buffer can only
> "remember" what got sent & committed to its changelog topic before.
>
> The scenario I have in mind is:
>
> ...
> * buffer state X
> ...
> * flush state X to buffer changelog
> ...
> * commit transaction T0; start new transaction T1
> ...
> * emit final result X (in uncommitted transaction T1)
> ...
> * crash before flushing to the changelog the fact that state X was emitted.
> Also, transaction T1 gets aborted, since we crash before committing.
> ...
> * restart, restoring state X again from the changelog (because the emit
> didn't get committed)
> * start transaction T2
> * emit final result X again (in uncommitted transaction T2)
> ...
> * commit transaction T2
> ...
>
> So, the result gets emitted twice, but the first time is in an aborted
> transaction. This leads me to another clarifying question:
>
> Based on your first message, it seems like the duplicates you observe are
> in the output topic. When you read the topic, do you configure your
> consumer with "read committed" mode? If not, you'll see "results" from
> uncommitted transactions, which could explain the duplicates.

So when EOS is enabled, the output topics are used in transactional 
manner. The consumer of such topic should enable read_commited semantics 
then...

That would do if my problem was about seeing duplicates of final 
windowing results. That is not my problem. My problem is that upon 
restart of processor, I see some non-final window aggregations, followed 
by final aggregations for the same windowed key. That's harder to 
tolerate in an application. If it was just duplicates of the "correct" 
aggregation I could ignore the 2nd and subsequent message for the same 
windowed key, but if I 1st get a non-final aggregation, I can not simply 
ignore the 2nd occurence of the same windowed key. I must cope with 
"replacing the previous aggregation with new version of it" in the app. 
Meaning, that suppression of non-final results does not buy me anything 
as it is not guaranteeing that.

Is it possible that non-final windowed aggregations are emitted in some 
scenario, but then such transaction is rolled-back and I would not see 
the non-fnal aggregations if I enabled read commited isolation on consumer?

I think I'll have to reinstate the demo and try that...

Stay tuned.

Regards, Peter

>
> Likewise, if you were to attach a callback, like "foreach" downstream of
> the suppression, you would see duplicates in the case of a crash. Callbacks
> are a general "hole" in EOS, which I have some ideas to close, but that's a
> separate topic.
>
> There may still be something else going on, but I'm trying to start with
> the simpler explanations.
>
> Thanks again,
> -John
>
> Thanks,
> -John
>
> On Wed, Jan 23, 2019 at 5:11 AM Peter Levart <pe...@gmail.com> wrote:
>
>> Hi John,
>>
>> Sorry I haven't had time to prepare the minimal reproducer yet. I still
>> have plans to do it though...
>>
>> On 1/22/19 8:02 PM, John Roesler wrote:
>>> Hi Peter,
>>>
>>> Just to follow up on the actual bug, can you confirm whether:
>>> * when you say "restart", do you mean orderly shutdown and restart, or
>>> crash and restart?
>> I start it as SpringBoot application from IDEA and then stop it with the
>> red square button. It does initiate the shutdown sequence before
>> exiting... So I think it is by SIGTERM which initiates JVM shutdown
>> hook(s).
>>
>>> * have you tried this with EOS enabled? I can imagine some ways that
>> there
>>> could be duplicates, but they should be impossible with EOS enabled.
>> Yes, I have EOS enabled.
>>
>>> Thanks for your help,
>>> -John
>> Regards, Peter
>>
>>> On Mon, Jan 14, 2019 at 1:20 PM John Roesler <jo...@confluent.io> wrote:
>>>
>>>> Hi Peter,
>>>>
>>>> I see your train of thought, but the actual implementation of the
>>>> window store is structured differently from your mental model.
>>>> Unlike Key/Value stores, we know that the records in a window
>>>> store will "expire" on a regular schedule, and also that every single
>>>> record will eventually expire. With this in mind, we have implemented
>>>> an optimization to avoid a lot of compaction overhead in RocksDB, as
>>>> well as saving on range scans.
>>>>
>>>> Instead of storing everything in one database, we open several
>>>> databases and bucket windows into them. Then, when windows
>>>> expire, we just ignore the records (i.e., the API makes them
>> unreachable,
>>>> but we don't actually delete them). Once all the windows in a database
>>>> are expired, we just close and delete the whole database. Then, we open
>>>> a new one for new windows. If you look in the code, these databases are
>>>> called "segments".
>>>>
>>>> Thus, I don't think that you should attempt to use the built-in window
>>>> stores
>>>> as you described. Instead, it should be straightforward to implement
>> your
>>>> own StateStore with a layout that's more favorable to your desired
>>>> behavior.
>>>>
>>>> You should also be able to set up the change log the way you need as
>> well.
>>>> Explicitly removed entities also would get removed from the log as
>> well, if
>>>> it's a compacted log.
>>>>
>>>> Actually, what you're describing is *very* similar to the implementation
>>>> for suppress. I might actually suggest that you just copy the
>> suppression
>>>> implementation and adapt it to your needs, or at the very least, study
>>>> how it works. In doing so, you might actually discover the cause of the
>>>> bug yourself!
>>>>
>>>> I hope this helps, and thanks for your help,
>>>> -John
>>>>
>>>>
>>>> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <pe...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jonh,
>>>>>
>>>>> Thank you very much for explaining how WindowStore works. I have some
>>>>> more questions...
>>>>>
>>>>> On 1/10/19 5:33 PM, John Roesler wrote:
>>>>>> Hi Peter,
>>>>>>
>>>>>> Regarding retention, I was not referring to log retention, but to the
>>>>>> window store retention.
>>>>>> Since a new window is created every second (for example), there are in
>>>>>> principle an unbounded
>>>>>> number of windows (the longer the application runs, the more windows
>>>>> there
>>>>>> are, with no end).
>>>>>> However, we obviously can't store an infinite amount of data, so the
>>>>> window
>>>>>> definition includes
>>>>>> a retention period. By default, this is 24 hours. After the retention
>>>>>> period elapses, all of the data
>>>>>> for the window is purged to make room for new windows.
>>>>> Right. Would the following work for example:
>>>>>
>>>>> - configure retention of WindowStore to be "infinite"
>>>>> - explicitly remove records from the store when windows are flushed out
>>>>> - configure WindowStore log topic for compacting
>>>>>
>>>>> Something like the following:
>>>>>
>>>>>            Stores
>>>>>                .windowStoreBuilder(
>>>>>                    Stores.persistentWindowStore(
>>>>>                        storeName,
>>>>>                        Duration.of(1000L, ChronoUnit.YEARS), //
>>>>> retentionPeriod
>>>>>                        Duration.ofSeconds(10), // windowSize
>>>>>                        false
>>>>>                    ),
>>>>>                    keySerde, valSerde
>>>>>                )
>>>>>                .withCachingEnabled()
>>>>>                .withLoggingEnabled(
>>>>>                    Map.of(
>>>>>                        TopicConfig.CLEANUP_POLICY_CONFIG,
>>>>> TopicConfig.CLEANUP_POLICY_COMPACT
>>>>>                    )
>>>>>                );
>>>>>
>>>>> Would in above scenario:
>>>>>
>>>>> - the on-disk WindowStore be kept bounded (there could be some very old
>>>>> entries in it but majority will be new - depending on the activity of
>>>>> particular input keys)
>>>>> - the log topic be kept bounded (explicitly removed entries would be
>>>>> removed from compacted log too)
>>>>>
>>>>> I'm moving away from DSL partly because I have some problems with
>>>>> suppression (which I hope we'll be able to fix) and partly because the
>>>>> DSL can't give me the complicated semantics that I need for the
>>>>> application at hand. I tried to capture what I need in a custom
>>>>> Transformer here:
>>>>>
>>>>> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
>>>>>
>>>>> Your knowledge of how WindowStore works would greatly help me decide if
>>>>> this is a workable idea.
>>>>>
>>>>>> So what I meant was that if you buffer some key "A" in window (Monday
>>>>>> 09:00:00) and then get
>>>>>> no further activity for A for over 24 hours, then when you do get that
>>>>> next
>>>>>> event for A, say at
>>>>>> (Tuesday 11:00:00), you'd do the scan but find nothing, since your
>>>>> buffered
>>>>>> state would already
>>>>>> have been purged from the store.
>>>>> Right. That would be the case when WindowStore was configured with
>>>>> default retention of 24 hours. A quick question: What does window size
>>>>> configuration for WindowStore (see above) do? Does it have to be
>>>>> synchronized with the size of windows stored in it?
>>>>>
>>>>>> The way I avoided this problem for Suppression was to organize the
>> data
>>>>> by
>>>>>> timestamp instead
>>>>>> of by key, so on *every* update I can search for all the keys that are
>>>>> old
>>>>>> enough and emit them.
>>>>>> I also don't use a window store, so I don't have to worry about the
>>>>>> retention time.
>>>>>>
>>>>>> To answer your question about the window store's topic, it configures
>> a
>>>>>> retention time the same
>>>>>> length as the store's retention time, (and they keys are the full
>>>>> windowed
>>>>>> key including the window
>>>>>> start time), so it'll have roughly the same size bound as the store
>>>>> itself.
>>>>>
>>>>> Would explicitly removed entries from WindowStore be removed from log
>>>>> too if it was a compacting log?
>>>>>
>>>>>> Back to the process of figuring out what might be wrong with
>>>>> Suppression, I
>>>>>> don't suppose you
>>>>>> would be able to file a Jira and upload a repro program? If not,
>> that's
>>>>> ok.
>>>>>> I haven't been able to
>>>>>> reproduce the bug yet, but it seems like it's happening somewhat
>>>>>> consistently for you, so I should
>>>>>> be able to get it to happen eventually.
>>>>>>
>>>>>> Thanks, and sorry again for the troubles.
>>>>>> -John
>>>>> I can prepare a minimal reproducer. No problem...
>>>>>
>>>>> Regards, Peter
>>>>>
>>>>>> On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com>
>>>>> wrote:
>>>>>>> On 1/8/19 12:57 PM, Peter Levart wrote:
>>>>>>>> Hi John,
>>>>>>>>
>>>>>>>> On 1/8/19 12:45 PM, Peter Levart wrote:
>>>>>>>>>> I looked at your custom transfomer, and it looks almost correct to
>>>>>>>>>> me. The
>>>>>>>>>> only flaw seems to be that it only looks
>>>>>>>>>> for closed windows for the key currently being processed, which
>>>>>>>>>> means that
>>>>>>>>>> if you have key "A" buffered, but don't get another event for it
>>>>> for a
>>>>>>>>>> while after the window closes, you won't emit the final result.
>> This
>>>>>>>>>> might
>>>>>>>>>> actually take longer than the window retention period, in which
>>>>>>>>>> case, the
>>>>>>>>>> data would be deleted without ever emitting the final result.
>>>>>>>>> So in DSL case, the suppression works by flushing *all* of the
>> "ripe"
>>>>>>>>> windows in the whole buffer whenever a singe event comes in with
>>>>>>>>> recent enough timestamp regardless of the key of that event?
>>>>>>>>>
>>>>>>>>> Is the buffer shared among processing tasks or does each task
>>>>>>>>> maintain its own private buffer that only contains its share of
>> data
>>>>>>>>> pertaining to assigned input partitions? In case the tasks are
>>>>>>>>> executed on several processing JVM(s) the buffer can't really be
>>>>>>>>> shared, right? In that case a single event can't flush all of the
>>>>>>>>> "ripe" windows, but just those that are contained in the task's
>> part
>>>>>>>>> of buffer...
>>>>>>>> Just a question about your comment above:
>>>>>>>>
>>>>>>>> /"This might actually take longer than the window retention period,
>> in
>>>>>>>> which case, the data would be deleted without ever emitting the
>> final
>>>>>>>> result"/
>>>>>>>>
>>>>>>>> Are you talking about the buffer log topic retention? Aren't log
>>>>>>>> topics configured to "compact" rather than "delete" messages? So the
>>>>>>>> last "version" of the buffer entry for a particular key should stay
>>>>>>>> forever? What are the keys in suppression buffer log topic? Are
>> they a
>>>>>>>> pair of (timestamp, key) ? Probably not since in that case the
>>>>>>>> compacted log would grow indefinitely...
>>>>>>>>
>>>>>>>> Another question:
>>>>>>>>
>>>>>>>> What are the keys in WindowStore's log topic? If the input keys to
>> the
>>>>>>>> processor that uses such WindowStore consist of a bounded set of
>>>>>>>> values (for example user ids), would compacted log of such
>> WindowStore
>>>>>>>> also be bounded?
>>>>>>> In case the key of WindowStore log topic is (timestamp, key) then
>> would
>>>>>>> explicitly deleting flushed entries from WindowStore (by putting null
>>>>>>> value into the store) keep the compacted log bounded? In other words,
>>>>>>> does WindowStore log topic support a special kind of "tombstone"
>>>>> message
>>>>>>> that effectively removes the key from the compacted log?
>>>>>>>
>>>>>>> In that case, my custom processor could keep entries in its
>> WindowStore
>>>>>>> for as log as needed, depending on the activity of a particular input
>>>>>>> key...
>>>>>>>
>>>>>>>> Regards, Peter
>>>>>>>>
>>>>>>>>
>>


Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by John Roesler <jo...@confluent.io>.
Hi Peter,

Thanks for the clarification.

When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't
think that Streams automatically registers a shutdown hook. In our examples
and demos, we register a shutdown hook "outside" of streams (right next to
the code that calls start() ).
Unless I missed something, a SIGTERM would still cause Streams to exit
abruptly, skipping flush and commit. This can cause apparent duplicates *if
you're not using EOS or if you're reading uncommitted transactions*.

The reason is that, upon restart, the suppression buffer can only
"remember" what got sent & committed to its changelog topic before.

The scenario I have in mind is:

...
* buffer state X
...
* flush state X to buffer changelog
...
* commit transaction T0; start new transaction T1
...
* emit final result X (in uncommitted transaction T1)
...
* crash before flushing to the changelog the fact that state X was emitted.
Also, transaction T1 gets aborted, since we crash before committing.
...
* restart, restoring state X again from the changelog (because the emit
didn't get committed)
* start transaction T2
* emit final result X again (in uncommitted transaction T2)
...
* commit transaction T2
...

So, the result gets emitted twice, but the first time is in an aborted
transaction. This leads me to another clarifying question:

Based on your first message, it seems like the duplicates you observe are
in the output topic. When you read the topic, do you configure your
consumer with "read committed" mode? If not, you'll see "results" from
uncommitted transactions, which could explain the duplicates.

Likewise, if you were to attach a callback, like "foreach" downstream of
the suppression, you would see duplicates in the case of a crash. Callbacks
are a general "hole" in EOS, which I have some ideas to close, but that's a
separate topic.

There may still be something else going on, but I'm trying to start with
the simpler explanations.

Thanks again,
-John

Thanks,
-John

On Wed, Jan 23, 2019 at 5:11 AM Peter Levart <pe...@gmail.com> wrote:

> Hi John,
>
> Sorry I haven't had time to prepare the minimal reproducer yet. I still
> have plans to do it though...
>
> On 1/22/19 8:02 PM, John Roesler wrote:
> > Hi Peter,
> >
> > Just to follow up on the actual bug, can you confirm whether:
> > * when you say "restart", do you mean orderly shutdown and restart, or
> > crash and restart?
>
> I start it as SpringBoot application from IDEA and then stop it with the
> red square button. It does initiate the shutdown sequence before
> exiting... So I think it is by SIGTERM which initiates JVM shutdown
> hook(s).
>
> > * have you tried this with EOS enabled? I can imagine some ways that
> there
> > could be duplicates, but they should be impossible with EOS enabled.
>
> Yes, I have EOS enabled.
>
> >
> > Thanks for your help,
> > -John
>
> Regards, Peter
>
> >
> > On Mon, Jan 14, 2019 at 1:20 PM John Roesler <jo...@confluent.io> wrote:
> >
> >> Hi Peter,
> >>
> >> I see your train of thought, but the actual implementation of the
> >> window store is structured differently from your mental model.
> >> Unlike Key/Value stores, we know that the records in a window
> >> store will "expire" on a regular schedule, and also that every single
> >> record will eventually expire. With this in mind, we have implemented
> >> an optimization to avoid a lot of compaction overhead in RocksDB, as
> >> well as saving on range scans.
> >>
> >> Instead of storing everything in one database, we open several
> >> databases and bucket windows into them. Then, when windows
> >> expire, we just ignore the records (i.e., the API makes them
> unreachable,
> >> but we don't actually delete them). Once all the windows in a database
> >> are expired, we just close and delete the whole database. Then, we open
> >> a new one for new windows. If you look in the code, these databases are
> >> called "segments".
> >>
> >> Thus, I don't think that you should attempt to use the built-in window
> >> stores
> >> as you described. Instead, it should be straightforward to implement
> your
> >> own StateStore with a layout that's more favorable to your desired
> >> behavior.
> >>
> >> You should also be able to set up the change log the way you need as
> well.
> >> Explicitly removed entities also would get removed from the log as
> well, if
> >> it's a compacted log.
> >>
> >> Actually, what you're describing is *very* similar to the implementation
> >> for suppress. I might actually suggest that you just copy the
> suppression
> >> implementation and adapt it to your needs, or at the very least, study
> >> how it works. In doing so, you might actually discover the cause of the
> >> bug yourself!
> >>
> >> I hope this helps, and thanks for your help,
> >> -John
> >>
> >>
> >> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <pe...@gmail.com>
> >> wrote:
> >>
> >>> Hi Jonh,
> >>>
> >>> Thank you very much for explaining how WindowStore works. I have some
> >>> more questions...
> >>>
> >>> On 1/10/19 5:33 PM, John Roesler wrote:
> >>>> Hi Peter,
> >>>>
> >>>> Regarding retention, I was not referring to log retention, but to the
> >>>> window store retention.
> >>>> Since a new window is created every second (for example), there are in
> >>>> principle an unbounded
> >>>> number of windows (the longer the application runs, the more windows
> >>> there
> >>>> are, with no end).
> >>>> However, we obviously can't store an infinite amount of data, so the
> >>> window
> >>>> definition includes
> >>>> a retention period. By default, this is 24 hours. After the retention
> >>>> period elapses, all of the data
> >>>> for the window is purged to make room for new windows.
> >>> Right. Would the following work for example:
> >>>
> >>> - configure retention of WindowStore to be "infinite"
> >>> - explicitly remove records from the store when windows are flushed out
> >>> - configure WindowStore log topic for compacting
> >>>
> >>> Something like the following:
> >>>
> >>>           Stores
> >>>               .windowStoreBuilder(
> >>>                   Stores.persistentWindowStore(
> >>>                       storeName,
> >>>                       Duration.of(1000L, ChronoUnit.YEARS), //
> >>> retentionPeriod
> >>>                       Duration.ofSeconds(10), // windowSize
> >>>                       false
> >>>                   ),
> >>>                   keySerde, valSerde
> >>>               )
> >>>               .withCachingEnabled()
> >>>               .withLoggingEnabled(
> >>>                   Map.of(
> >>>                       TopicConfig.CLEANUP_POLICY_CONFIG,
> >>> TopicConfig.CLEANUP_POLICY_COMPACT
> >>>                   )
> >>>               );
> >>>
> >>> Would in above scenario:
> >>>
> >>> - the on-disk WindowStore be kept bounded (there could be some very old
> >>> entries in it but majority will be new - depending on the activity of
> >>> particular input keys)
> >>> - the log topic be kept bounded (explicitly removed entries would be
> >>> removed from compacted log too)
> >>>
> >>> I'm moving away from DSL partly because I have some problems with
> >>> suppression (which I hope we'll be able to fix) and partly because the
> >>> DSL can't give me the complicated semantics that I need for the
> >>> application at hand. I tried to capture what I need in a custom
> >>> Transformer here:
> >>>
> >>> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
> >>>
> >>> Your knowledge of how WindowStore works would greatly help me decide if
> >>> this is a workable idea.
> >>>
> >>>> So what I meant was that if you buffer some key "A" in window (Monday
> >>>> 09:00:00) and then get
> >>>> no further activity for A for over 24 hours, then when you do get that
> >>> next
> >>>> event for A, say at
> >>>> (Tuesday 11:00:00), you'd do the scan but find nothing, since your
> >>> buffered
> >>>> state would already
> >>>> have been purged from the store.
> >>> Right. That would be the case when WindowStore was configured with
> >>> default retention of 24 hours. A quick question: What does window size
> >>> configuration for WindowStore (see above) do? Does it have to be
> >>> synchronized with the size of windows stored in it?
> >>>
> >>>> The way I avoided this problem for Suppression was to organize the
> data
> >>> by
> >>>> timestamp instead
> >>>> of by key, so on *every* update I can search for all the keys that are
> >>> old
> >>>> enough and emit them.
> >>>> I also don't use a window store, so I don't have to worry about the
> >>>> retention time.
> >>>>
> >>>> To answer your question about the window store's topic, it configures
> a
> >>>> retention time the same
> >>>> length as the store's retention time, (and they keys are the full
> >>> windowed
> >>>> key including the window
> >>>> start time), so it'll have roughly the same size bound as the store
> >>> itself.
> >>>
> >>> Would explicitly removed entries from WindowStore be removed from log
> >>> too if it was a compacting log?
> >>>
> >>>> Back to the process of figuring out what might be wrong with
> >>> Suppression, I
> >>>> don't suppose you
> >>>> would be able to file a Jira and upload a repro program? If not,
> that's
> >>> ok.
> >>>> I haven't been able to
> >>>> reproduce the bug yet, but it seems like it's happening somewhat
> >>>> consistently for you, so I should
> >>>> be able to get it to happen eventually.
> >>>>
> >>>> Thanks, and sorry again for the troubles.
> >>>> -John
> >>> I can prepare a minimal reproducer. No problem...
> >>>
> >>> Regards, Peter
> >>>
> >>>> On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com>
> >>> wrote:
> >>>>> On 1/8/19 12:57 PM, Peter Levart wrote:
> >>>>>> Hi John,
> >>>>>>
> >>>>>> On 1/8/19 12:45 PM, Peter Levart wrote:
> >>>>>>>> I looked at your custom transfomer, and it looks almost correct to
> >>>>>>>> me. The
> >>>>>>>> only flaw seems to be that it only looks
> >>>>>>>> for closed windows for the key currently being processed, which
> >>>>>>>> means that
> >>>>>>>> if you have key "A" buffered, but don't get another event for it
> >>> for a
> >>>>>>>> while after the window closes, you won't emit the final result.
> This
> >>>>>>>> might
> >>>>>>>> actually take longer than the window retention period, in which
> >>>>>>>> case, the
> >>>>>>>> data would be deleted without ever emitting the final result.
> >>>>>>> So in DSL case, the suppression works by flushing *all* of the
> "ripe"
> >>>>>>> windows in the whole buffer whenever a singe event comes in with
> >>>>>>> recent enough timestamp regardless of the key of that event?
> >>>>>>>
> >>>>>>> Is the buffer shared among processing tasks or does each task
> >>>>>>> maintain its own private buffer that only contains its share of
> data
> >>>>>>> pertaining to assigned input partitions? In case the tasks are
> >>>>>>> executed on several processing JVM(s) the buffer can't really be
> >>>>>>> shared, right? In that case a single event can't flush all of the
> >>>>>>> "ripe" windows, but just those that are contained in the task's
> part
> >>>>>>> of buffer...
> >>>>>> Just a question about your comment above:
> >>>>>>
> >>>>>> /"This might actually take longer than the window retention period,
> in
> >>>>>> which case, the data would be deleted without ever emitting the
> final
> >>>>>> result"/
> >>>>>>
> >>>>>> Are you talking about the buffer log topic retention? Aren't log
> >>>>>> topics configured to "compact" rather than "delete" messages? So the
> >>>>>> last "version" of the buffer entry for a particular key should stay
> >>>>>> forever? What are the keys in suppression buffer log topic? Are
> they a
> >>>>>> pair of (timestamp, key) ? Probably not since in that case the
> >>>>>> compacted log would grow indefinitely...
> >>>>>>
> >>>>>> Another question:
> >>>>>>
> >>>>>> What are the keys in WindowStore's log topic? If the input keys to
> the
> >>>>>> processor that uses such WindowStore consist of a bounded set of
> >>>>>> values (for example user ids), would compacted log of such
> WindowStore
> >>>>>> also be bounded?
> >>>>> In case the key of WindowStore log topic is (timestamp, key) then
> would
> >>>>> explicitly deleting flushed entries from WindowStore (by putting null
> >>>>> value into the store) keep the compacted log bounded? In other words,
> >>>>> does WindowStore log topic support a special kind of "tombstone"
> >>> message
> >>>>> that effectively removes the key from the compacted log?
> >>>>>
> >>>>> In that case, my custom processor could keep entries in its
> WindowStore
> >>>>> for as log as needed, depending on the activity of a particular input
> >>>>> key...
> >>>>>
> >>>>>> Regards, Peter
> >>>>>>
> >>>>>>
> >>>
>
>

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by Peter Levart <pe...@gmail.com>.
Hi John,

Sorry I haven't had time to prepare the minimal reproducer yet. I still 
have plans to do it though...

On 1/22/19 8:02 PM, John Roesler wrote:
> Hi Peter,
>
> Just to follow up on the actual bug, can you confirm whether:
> * when you say "restart", do you mean orderly shutdown and restart, or
> crash and restart?

I start it as SpringBoot application from IDEA and then stop it with the 
red square button. It does initiate the shutdown sequence before 
exiting... So I think it is by SIGTERM which initiates JVM shutdown hook(s).

> * have you tried this with EOS enabled? I can imagine some ways that there
> could be duplicates, but they should be impossible with EOS enabled.

Yes, I have EOS enabled.

>
> Thanks for your help,
> -John

Regards, Peter

>
> On Mon, Jan 14, 2019 at 1:20 PM John Roesler <jo...@confluent.io> wrote:
>
>> Hi Peter,
>>
>> I see your train of thought, but the actual implementation of the
>> window store is structured differently from your mental model.
>> Unlike Key/Value stores, we know that the records in a window
>> store will "expire" on a regular schedule, and also that every single
>> record will eventually expire. With this in mind, we have implemented
>> an optimization to avoid a lot of compaction overhead in RocksDB, as
>> well as saving on range scans.
>>
>> Instead of storing everything in one database, we open several
>> databases and bucket windows into them. Then, when windows
>> expire, we just ignore the records (i.e., the API makes them unreachable,
>> but we don't actually delete them). Once all the windows in a database
>> are expired, we just close and delete the whole database. Then, we open
>> a new one for new windows. If you look in the code, these databases are
>> called "segments".
>>
>> Thus, I don't think that you should attempt to use the built-in window
>> stores
>> as you described. Instead, it should be straightforward to implement your
>> own StateStore with a layout that's more favorable to your desired
>> behavior.
>>
>> You should also be able to set up the change log the way you need as well.
>> Explicitly removed entities also would get removed from the log as well, if
>> it's a compacted log.
>>
>> Actually, what you're describing is *very* similar to the implementation
>> for suppress. I might actually suggest that you just copy the suppression
>> implementation and adapt it to your needs, or at the very least, study
>> how it works. In doing so, you might actually discover the cause of the
>> bug yourself!
>>
>> I hope this helps, and thanks for your help,
>> -John
>>
>>
>> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <pe...@gmail.com>
>> wrote:
>>
>>> Hi Jonh,
>>>
>>> Thank you very much for explaining how WindowStore works. I have some
>>> more questions...
>>>
>>> On 1/10/19 5:33 PM, John Roesler wrote:
>>>> Hi Peter,
>>>>
>>>> Regarding retention, I was not referring to log retention, but to the
>>>> window store retention.
>>>> Since a new window is created every second (for example), there are in
>>>> principle an unbounded
>>>> number of windows (the longer the application runs, the more windows
>>> there
>>>> are, with no end).
>>>> However, we obviously can't store an infinite amount of data, so the
>>> window
>>>> definition includes
>>>> a retention period. By default, this is 24 hours. After the retention
>>>> period elapses, all of the data
>>>> for the window is purged to make room for new windows.
>>> Right. Would the following work for example:
>>>
>>> - configure retention of WindowStore to be "infinite"
>>> - explicitly remove records from the store when windows are flushed out
>>> - configure WindowStore log topic for compacting
>>>
>>> Something like the following:
>>>
>>>           Stores
>>>               .windowStoreBuilder(
>>>                   Stores.persistentWindowStore(
>>>                       storeName,
>>>                       Duration.of(1000L, ChronoUnit.YEARS), //
>>> retentionPeriod
>>>                       Duration.ofSeconds(10), // windowSize
>>>                       false
>>>                   ),
>>>                   keySerde, valSerde
>>>               )
>>>               .withCachingEnabled()
>>>               .withLoggingEnabled(
>>>                   Map.of(
>>>                       TopicConfig.CLEANUP_POLICY_CONFIG,
>>> TopicConfig.CLEANUP_POLICY_COMPACT
>>>                   )
>>>               );
>>>
>>> Would in above scenario:
>>>
>>> - the on-disk WindowStore be kept bounded (there could be some very old
>>> entries in it but majority will be new - depending on the activity of
>>> particular input keys)
>>> - the log topic be kept bounded (explicitly removed entries would be
>>> removed from compacted log too)
>>>
>>> I'm moving away from DSL partly because I have some problems with
>>> suppression (which I hope we'll be able to fix) and partly because the
>>> DSL can't give me the complicated semantics that I need for the
>>> application at hand. I tried to capture what I need in a custom
>>> Transformer here:
>>>
>>> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
>>>
>>> Your knowledge of how WindowStore works would greatly help me decide if
>>> this is a workable idea.
>>>
>>>> So what I meant was that if you buffer some key "A" in window (Monday
>>>> 09:00:00) and then get
>>>> no further activity for A for over 24 hours, then when you do get that
>>> next
>>>> event for A, say at
>>>> (Tuesday 11:00:00), you'd do the scan but find nothing, since your
>>> buffered
>>>> state would already
>>>> have been purged from the store.
>>> Right. That would be the case when WindowStore was configured with
>>> default retention of 24 hours. A quick question: What does window size
>>> configuration for WindowStore (see above) do? Does it have to be
>>> synchronized with the size of windows stored in it?
>>>
>>>> The way I avoided this problem for Suppression was to organize the data
>>> by
>>>> timestamp instead
>>>> of by key, so on *every* update I can search for all the keys that are
>>> old
>>>> enough and emit them.
>>>> I also don't use a window store, so I don't have to worry about the
>>>> retention time.
>>>>
>>>> To answer your question about the window store's topic, it configures a
>>>> retention time the same
>>>> length as the store's retention time, (and they keys are the full
>>> windowed
>>>> key including the window
>>>> start time), so it'll have roughly the same size bound as the store
>>> itself.
>>>
>>> Would explicitly removed entries from WindowStore be removed from log
>>> too if it was a compacting log?
>>>
>>>> Back to the process of figuring out what might be wrong with
>>> Suppression, I
>>>> don't suppose you
>>>> would be able to file a Jira and upload a repro program? If not, that's
>>> ok.
>>>> I haven't been able to
>>>> reproduce the bug yet, but it seems like it's happening somewhat
>>>> consistently for you, so I should
>>>> be able to get it to happen eventually.
>>>>
>>>> Thanks, and sorry again for the troubles.
>>>> -John
>>> I can prepare a minimal reproducer. No problem...
>>>
>>> Regards, Peter
>>>
>>>> On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com>
>>> wrote:
>>>>> On 1/8/19 12:57 PM, Peter Levart wrote:
>>>>>> Hi John,
>>>>>>
>>>>>> On 1/8/19 12:45 PM, Peter Levart wrote:
>>>>>>>> I looked at your custom transfomer, and it looks almost correct to
>>>>>>>> me. The
>>>>>>>> only flaw seems to be that it only looks
>>>>>>>> for closed windows for the key currently being processed, which
>>>>>>>> means that
>>>>>>>> if you have key "A" buffered, but don't get another event for it
>>> for a
>>>>>>>> while after the window closes, you won't emit the final result. This
>>>>>>>> might
>>>>>>>> actually take longer than the window retention period, in which
>>>>>>>> case, the
>>>>>>>> data would be deleted without ever emitting the final result.
>>>>>>> So in DSL case, the suppression works by flushing *all* of the "ripe"
>>>>>>> windows in the whole buffer whenever a singe event comes in with
>>>>>>> recent enough timestamp regardless of the key of that event?
>>>>>>>
>>>>>>> Is the buffer shared among processing tasks or does each task
>>>>>>> maintain its own private buffer that only contains its share of data
>>>>>>> pertaining to assigned input partitions? In case the tasks are
>>>>>>> executed on several processing JVM(s) the buffer can't really be
>>>>>>> shared, right? In that case a single event can't flush all of the
>>>>>>> "ripe" windows, but just those that are contained in the task's part
>>>>>>> of buffer...
>>>>>> Just a question about your comment above:
>>>>>>
>>>>>> /"This might actually take longer than the window retention period, in
>>>>>> which case, the data would be deleted without ever emitting the final
>>>>>> result"/
>>>>>>
>>>>>> Are you talking about the buffer log topic retention? Aren't log
>>>>>> topics configured to "compact" rather than "delete" messages? So the
>>>>>> last "version" of the buffer entry for a particular key should stay
>>>>>> forever? What are the keys in suppression buffer log topic? Are they a
>>>>>> pair of (timestamp, key) ? Probably not since in that case the
>>>>>> compacted log would grow indefinitely...
>>>>>>
>>>>>> Another question:
>>>>>>
>>>>>> What are the keys in WindowStore's log topic? If the input keys to the
>>>>>> processor that uses such WindowStore consist of a bounded set of
>>>>>> values (for example user ids), would compacted log of such WindowStore
>>>>>> also be bounded?
>>>>> In case the key of WindowStore log topic is (timestamp, key) then would
>>>>> explicitly deleting flushed entries from WindowStore (by putting null
>>>>> value into the store) keep the compacted log bounded? In other words,
>>>>> does WindowStore log topic support a special kind of "tombstone"
>>> message
>>>>> that effectively removes the key from the compacted log?
>>>>>
>>>>> In that case, my custom processor could keep entries in its WindowStore
>>>>> for as log as needed, depending on the activity of a particular input
>>>>> key...
>>>>>
>>>>>> Regards, Peter
>>>>>>
>>>>>>
>>>


Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by John Roesler <jo...@confluent.io>.
Hi Peter,

Just to follow up on the actual bug, can you confirm whether:
* when you say "restart", do you mean orderly shutdown and restart, or
crash and restart?
* have you tried this with EOS enabled? I can imagine some ways that there
could be duplicates, but they should be impossible with EOS enabled.

Thanks for your help,
-John

On Mon, Jan 14, 2019 at 1:20 PM John Roesler <jo...@confluent.io> wrote:

> Hi Peter,
>
> I see your train of thought, but the actual implementation of the
> window store is structured differently from your mental model.
> Unlike Key/Value stores, we know that the records in a window
> store will "expire" on a regular schedule, and also that every single
> record will eventually expire. With this in mind, we have implemented
> an optimization to avoid a lot of compaction overhead in RocksDB, as
> well as saving on range scans.
>
> Instead of storing everything in one database, we open several
> databases and bucket windows into them. Then, when windows
> expire, we just ignore the records (i.e., the API makes them unreachable,
> but we don't actually delete them). Once all the windows in a database
> are expired, we just close and delete the whole database. Then, we open
> a new one for new windows. If you look in the code, these databases are
> called "segments".
>
> Thus, I don't think that you should attempt to use the built-in window
> stores
> as you described. Instead, it should be straightforward to implement your
> own StateStore with a layout that's more favorable to your desired
> behavior.
>
> You should also be able to set up the change log the way you need as well.
> Explicitly removed entities also would get removed from the log as well, if
> it's a compacted log.
>
> Actually, what you're describing is *very* similar to the implementation
> for suppress. I might actually suggest that you just copy the suppression
> implementation and adapt it to your needs, or at the very least, study
> how it works. In doing so, you might actually discover the cause of the
> bug yourself!
>
> I hope this helps, and thanks for your help,
> -John
>
>
> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <pe...@gmail.com>
> wrote:
>
>> Hi Jonh,
>>
>> Thank you very much for explaining how WindowStore works. I have some
>> more questions...
>>
>> On 1/10/19 5:33 PM, John Roesler wrote:
>> > Hi Peter,
>> >
>> > Regarding retention, I was not referring to log retention, but to the
>> > window store retention.
>> > Since a new window is created every second (for example), there are in
>> > principle an unbounded
>> > number of windows (the longer the application runs, the more windows
>> there
>> > are, with no end).
>> > However, we obviously can't store an infinite amount of data, so the
>> window
>> > definition includes
>> > a retention period. By default, this is 24 hours. After the retention
>> > period elapses, all of the data
>> > for the window is purged to make room for new windows.
>>
>> Right. Would the following work for example:
>>
>> - configure retention of WindowStore to be "infinite"
>> - explicitly remove records from the store when windows are flushed out
>> - configure WindowStore log topic for compacting
>>
>> Something like the following:
>>
>>          Stores
>>              .windowStoreBuilder(
>>                  Stores.persistentWindowStore(
>>                      storeName,
>>                      Duration.of(1000L, ChronoUnit.YEARS), //
>> retentionPeriod
>>                      Duration.ofSeconds(10), // windowSize
>>                      false
>>                  ),
>>                  keySerde, valSerde
>>              )
>>              .withCachingEnabled()
>>              .withLoggingEnabled(
>>                  Map.of(
>>                      TopicConfig.CLEANUP_POLICY_CONFIG,
>> TopicConfig.CLEANUP_POLICY_COMPACT
>>                  )
>>              );
>>
>> Would in above scenario:
>>
>> - the on-disk WindowStore be kept bounded (there could be some very old
>> entries in it but majority will be new - depending on the activity of
>> particular input keys)
>> - the log topic be kept bounded (explicitly removed entries would be
>> removed from compacted log too)
>>
>> I'm moving away from DSL partly because I have some problems with
>> suppression (which I hope we'll be able to fix) and partly because the
>> DSL can't give me the complicated semantics that I need for the
>> application at hand. I tried to capture what I need in a custom
>> Transformer here:
>>
>> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
>>
>> Your knowledge of how WindowStore works would greatly help me decide if
>> this is a workable idea.
>>
>> >
>> > So what I meant was that if you buffer some key "A" in window (Monday
>> > 09:00:00) and then get
>> > no further activity for A for over 24 hours, then when you do get that
>> next
>> > event for A, say at
>> > (Tuesday 11:00:00), you'd do the scan but find nothing, since your
>> buffered
>> > state would already
>> > have been purged from the store.
>>
>> Right. That would be the case when WindowStore was configured with
>> default retention of 24 hours. A quick question: What does window size
>> configuration for WindowStore (see above) do? Does it have to be
>> synchronized with the size of windows stored in it?
>>
>> >
>> > The way I avoided this problem for Suppression was to organize the data
>> by
>> > timestamp instead
>> > of by key, so on *every* update I can search for all the keys that are
>> old
>> > enough and emit them.
>> > I also don't use a window store, so I don't have to worry about the
>> > retention time.
>> >
>> > To answer your question about the window store's topic, it configures a
>> > retention time the same
>> > length as the store's retention time, (and they keys are the full
>> windowed
>> > key including the window
>> > start time), so it'll have roughly the same size bound as the store
>> itself.
>>
>> Would explicitly removed entries from WindowStore be removed from log
>> too if it was a compacting log?
>>
>> >
>> > Back to the process of figuring out what might be wrong with
>> Suppression, I
>> > don't suppose you
>> > would be able to file a Jira and upload a repro program? If not, that's
>> ok.
>> > I haven't been able to
>> > reproduce the bug yet, but it seems like it's happening somewhat
>> > consistently for you, so I should
>> > be able to get it to happen eventually.
>> >
>> > Thanks, and sorry again for the troubles.
>> > -John
>>
>> I can prepare a minimal reproducer. No problem...
>>
>> Regards, Peter
>>
>> >
>> > On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com>
>> wrote:
>> >
>> >>
>> >> On 1/8/19 12:57 PM, Peter Levart wrote:
>> >>> Hi John,
>> >>>
>> >>> On 1/8/19 12:45 PM, Peter Levart wrote:
>> >>>>> I looked at your custom transfomer, and it looks almost correct to
>> >>>>> me. The
>> >>>>> only flaw seems to be that it only looks
>> >>>>> for closed windows for the key currently being processed, which
>> >>>>> means that
>> >>>>> if you have key "A" buffered, but don't get another event for it
>> for a
>> >>>>> while after the window closes, you won't emit the final result. This
>> >>>>> might
>> >>>>> actually take longer than the window retention period, in which
>> >>>>> case, the
>> >>>>> data would be deleted without ever emitting the final result.
>> >>>> So in DSL case, the suppression works by flushing *all* of the "ripe"
>> >>>> windows in the whole buffer whenever a singe event comes in with
>> >>>> recent enough timestamp regardless of the key of that event?
>> >>>>
>> >>>> Is the buffer shared among processing tasks or does each task
>> >>>> maintain its own private buffer that only contains its share of data
>> >>>> pertaining to assigned input partitions? In case the tasks are
>> >>>> executed on several processing JVM(s) the buffer can't really be
>> >>>> shared, right? In that case a single event can't flush all of the
>> >>>> "ripe" windows, but just those that are contained in the task's part
>> >>>> of buffer...
>> >>> Just a question about your comment above:
>> >>>
>> >>> /"This might actually take longer than the window retention period, in
>> >>> which case, the data would be deleted without ever emitting the final
>> >>> result"/
>> >>>
>> >>> Are you talking about the buffer log topic retention? Aren't log
>> >>> topics configured to "compact" rather than "delete" messages? So the
>> >>> last "version" of the buffer entry for a particular key should stay
>> >>> forever? What are the keys in suppression buffer log topic? Are they a
>> >>> pair of (timestamp, key) ? Probably not since in that case the
>> >>> compacted log would grow indefinitely...
>> >>>
>> >>> Another question:
>> >>>
>> >>> What are the keys in WindowStore's log topic? If the input keys to the
>> >>> processor that uses such WindowStore consist of a bounded set of
>> >>> values (for example user ids), would compacted log of such WindowStore
>> >>> also be bounded?
>> >> In case the key of WindowStore log topic is (timestamp, key) then would
>> >> explicitly deleting flushed entries from WindowStore (by putting null
>> >> value into the store) keep the compacted log bounded? In other words,
>> >> does WindowStore log topic support a special kind of "tombstone"
>> message
>> >> that effectively removes the key from the compacted log?
>> >>
>> >> In that case, my custom processor could keep entries in its WindowStore
>> >> for as log as needed, depending on the activity of a particular input
>> >> key...
>> >>
>> >>> Regards, Peter
>> >>>
>> >>>
>> >>
>>
>>

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by John Roesler <jo...@confluent.io>.
Hi Peter,

I see your train of thought, but the actual implementation of the
window store is structured differently from your mental model.
Unlike Key/Value stores, we know that the records in a window
store will "expire" on a regular schedule, and also that every single
record will eventually expire. With this in mind, we have implemented
an optimization to avoid a lot of compaction overhead in RocksDB, as
well as saving on range scans.

Instead of storing everything in one database, we open several
databases and bucket windows into them. Then, when windows
expire, we just ignore the records (i.e., the API makes them unreachable,
but we don't actually delete them). Once all the windows in a database
are expired, we just close and delete the whole database. Then, we open
a new one for new windows. If you look in the code, these databases are
called "segments".

Thus, I don't think that you should attempt to use the built-in window
stores
as you described. Instead, it should be straightforward to implement your
own StateStore with a layout that's more favorable to your desired behavior.

You should also be able to set up the change log the way you need as well.
Explicitly removed entities also would get removed from the log as well, if
it's a compacted log.

Actually, what you're describing is *very* similar to the implementation
for suppress. I might actually suggest that you just copy the suppression
implementation and adapt it to your needs, or at the very least, study
how it works. In doing so, you might actually discover the cause of the
bug yourself!

I hope this helps, and thanks for your help,
-John


On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <pe...@gmail.com> wrote:

> Hi Jonh,
>
> Thank you very much for explaining how WindowStore works. I have some
> more questions...
>
> On 1/10/19 5:33 PM, John Roesler wrote:
> > Hi Peter,
> >
> > Regarding retention, I was not referring to log retention, but to the
> > window store retention.
> > Since a new window is created every second (for example), there are in
> > principle an unbounded
> > number of windows (the longer the application runs, the more windows
> there
> > are, with no end).
> > However, we obviously can't store an infinite amount of data, so the
> window
> > definition includes
> > a retention period. By default, this is 24 hours. After the retention
> > period elapses, all of the data
> > for the window is purged to make room for new windows.
>
> Right. Would the following work for example:
>
> - configure retention of WindowStore to be "infinite"
> - explicitly remove records from the store when windows are flushed out
> - configure WindowStore log topic for compacting
>
> Something like the following:
>
>          Stores
>              .windowStoreBuilder(
>                  Stores.persistentWindowStore(
>                      storeName,
>                      Duration.of(1000L, ChronoUnit.YEARS), //
> retentionPeriod
>                      Duration.ofSeconds(10), // windowSize
>                      false
>                  ),
>                  keySerde, valSerde
>              )
>              .withCachingEnabled()
>              .withLoggingEnabled(
>                  Map.of(
>                      TopicConfig.CLEANUP_POLICY_CONFIG,
> TopicConfig.CLEANUP_POLICY_COMPACT
>                  )
>              );
>
> Would in above scenario:
>
> - the on-disk WindowStore be kept bounded (there could be some very old
> entries in it but majority will be new - depending on the activity of
> particular input keys)
> - the log topic be kept bounded (explicitly removed entries would be
> removed from compacted log too)
>
> I'm moving away from DSL partly because I have some problems with
> suppression (which I hope we'll be able to fix) and partly because the
> DSL can't give me the complicated semantics that I need for the
> application at hand. I tried to capture what I need in a custom
> Transformer here:
>
> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
>
> Your knowledge of how WindowStore works would greatly help me decide if
> this is a workable idea.
>
> >
> > So what I meant was that if you buffer some key "A" in window (Monday
> > 09:00:00) and then get
> > no further activity for A for over 24 hours, then when you do get that
> next
> > event for A, say at
> > (Tuesday 11:00:00), you'd do the scan but find nothing, since your
> buffered
> > state would already
> > have been purged from the store.
>
> Right. That would be the case when WindowStore was configured with
> default retention of 24 hours. A quick question: What does window size
> configuration for WindowStore (see above) do? Does it have to be
> synchronized with the size of windows stored in it?
>
> >
> > The way I avoided this problem for Suppression was to organize the data
> by
> > timestamp instead
> > of by key, so on *every* update I can search for all the keys that are
> old
> > enough and emit them.
> > I also don't use a window store, so I don't have to worry about the
> > retention time.
> >
> > To answer your question about the window store's topic, it configures a
> > retention time the same
> > length as the store's retention time, (and they keys are the full
> windowed
> > key including the window
> > start time), so it'll have roughly the same size bound as the store
> itself.
>
> Would explicitly removed entries from WindowStore be removed from log
> too if it was a compacting log?
>
> >
> > Back to the process of figuring out what might be wrong with
> Suppression, I
> > don't suppose you
> > would be able to file a Jira and upload a repro program? If not, that's
> ok.
> > I haven't been able to
> > reproduce the bug yet, but it seems like it's happening somewhat
> > consistently for you, so I should
> > be able to get it to happen eventually.
> >
> > Thanks, and sorry again for the troubles.
> > -John
>
> I can prepare a minimal reproducer. No problem...
>
> Regards, Peter
>
> >
> > On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com>
> wrote:
> >
> >>
> >> On 1/8/19 12:57 PM, Peter Levart wrote:
> >>> Hi John,
> >>>
> >>> On 1/8/19 12:45 PM, Peter Levart wrote:
> >>>>> I looked at your custom transfomer, and it looks almost correct to
> >>>>> me. The
> >>>>> only flaw seems to be that it only looks
> >>>>> for closed windows for the key currently being processed, which
> >>>>> means that
> >>>>> if you have key "A" buffered, but don't get another event for it for
> a
> >>>>> while after the window closes, you won't emit the final result. This
> >>>>> might
> >>>>> actually take longer than the window retention period, in which
> >>>>> case, the
> >>>>> data would be deleted without ever emitting the final result.
> >>>> So in DSL case, the suppression works by flushing *all* of the "ripe"
> >>>> windows in the whole buffer whenever a singe event comes in with
> >>>> recent enough timestamp regardless of the key of that event?
> >>>>
> >>>> Is the buffer shared among processing tasks or does each task
> >>>> maintain its own private buffer that only contains its share of data
> >>>> pertaining to assigned input partitions? In case the tasks are
> >>>> executed on several processing JVM(s) the buffer can't really be
> >>>> shared, right? In that case a single event can't flush all of the
> >>>> "ripe" windows, but just those that are contained in the task's part
> >>>> of buffer...
> >>> Just a question about your comment above:
> >>>
> >>> /"This might actually take longer than the window retention period, in
> >>> which case, the data would be deleted without ever emitting the final
> >>> result"/
> >>>
> >>> Are you talking about the buffer log topic retention? Aren't log
> >>> topics configured to "compact" rather than "delete" messages? So the
> >>> last "version" of the buffer entry for a particular key should stay
> >>> forever? What are the keys in suppression buffer log topic? Are they a
> >>> pair of (timestamp, key) ? Probably not since in that case the
> >>> compacted log would grow indefinitely...
> >>>
> >>> Another question:
> >>>
> >>> What are the keys in WindowStore's log topic? If the input keys to the
> >>> processor that uses such WindowStore consist of a bounded set of
> >>> values (for example user ids), would compacted log of such WindowStore
> >>> also be bounded?
> >> In case the key of WindowStore log topic is (timestamp, key) then would
> >> explicitly deleting flushed entries from WindowStore (by putting null
> >> value into the store) keep the compacted log bounded? In other words,
> >> does WindowStore log topic support a special kind of "tombstone" message
> >> that effectively removes the key from the compacted log?
> >>
> >> In that case, my custom processor could keep entries in its WindowStore
> >> for as log as needed, depending on the activity of a particular input
> >> key...
> >>
> >>> Regards, Peter
> >>>
> >>>
> >>
>
>

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by John Roesler <jo...@confluent.io>.
Hi Peter,

Regarding retention, I was not referring to log retention, but to the
window store retention.
Since a new window is created every second (for example), there are in
principle an unbounded
number of windows (the longer the application runs, the more windows there
are, with no end).
However, we obviously can't store an infinite amount of data, so the window
definition includes
a retention period. By default, this is 24 hours. After the retention
period elapses, all of the data
for the window is purged to make room for new windows.

So what I meant was that if you buffer some key "A" in window (Monday
09:00:00) and then get
no further activity for A for over 24 hours, then when you do get that next
event for A, say at
(Tuesday 11:00:00), you'd do the scan but find nothing, since your buffered
state would already
have been purged from the store.

The way I avoided this problem for Suppression was to organize the data by
timestamp instead
of by key, so on *every* update I can search for all the keys that are old
enough and emit them.
I also don't use a window store, so I don't have to worry about the
retention time.

To answer your question about the window store's topic, it configures a
retention time the same
length as the store's retention time, (and they keys are the full windowed
key including the window
start time), so it'll have roughly the same size bound as the store itself.

Back to the process of figuring out what might be wrong with Suppression, I
don't suppose you
would be able to file a Jira and upload a repro program? If not, that's ok.
I haven't been able to
reproduce the bug yet, but it seems like it's happening somewhat
consistently for you, so I should
be able to get it to happen eventually.

Thanks, and sorry again for the troubles.
-John

On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <pe...@gmail.com> wrote:

>
>
> On 1/8/19 12:57 PM, Peter Levart wrote:
> > Hi John,
> >
> > On 1/8/19 12:45 PM, Peter Levart wrote:
> >>> I looked at your custom transfomer, and it looks almost correct to
> >>> me. The
> >>> only flaw seems to be that it only looks
> >>> for closed windows for the key currently being processed, which
> >>> means that
> >>> if you have key "A" buffered, but don't get another event for it for a
> >>> while after the window closes, you won't emit the final result. This
> >>> might
> >>> actually take longer than the window retention period, in which
> >>> case, the
> >>> data would be deleted without ever emitting the final result.
> >>
> >> So in DSL case, the suppression works by flushing *all* of the "ripe"
> >> windows in the whole buffer whenever a singe event comes in with
> >> recent enough timestamp regardless of the key of that event?
> >>
> >> Is the buffer shared among processing tasks or does each task
> >> maintain its own private buffer that only contains its share of data
> >> pertaining to assigned input partitions? In case the tasks are
> >> executed on several processing JVM(s) the buffer can't really be
> >> shared, right? In that case a single event can't flush all of the
> >> "ripe" windows, but just those that are contained in the task's part
> >> of buffer...
> >
> > Just a question about your comment above:
> >
> > /"This might actually take longer than the window retention period, in
> > which case, the data would be deleted without ever emitting the final
> > result"/
> >
> > Are you talking about the buffer log topic retention? Aren't log
> > topics configured to "compact" rather than "delete" messages? So the
> > last "version" of the buffer entry for a particular key should stay
> > forever? What are the keys in suppression buffer log topic? Are they a
> > pair of (timestamp, key) ? Probably not since in that case the
> > compacted log would grow indefinitely...
> >
> > Another question:
> >
> > What are the keys in WindowStore's log topic? If the input keys to the
> > processor that uses such WindowStore consist of a bounded set of
> > values (for example user ids), would compacted log of such WindowStore
> > also be bounded?
>
> In case the key of WindowStore log topic is (timestamp, key) then would
> explicitly deleting flushed entries from WindowStore (by putting null
> value into the store) keep the compacted log bounded? In other words,
> does WindowStore log topic support a special kind of "tombstone" message
> that effectively removes the key from the compacted log?
>
> In that case, my custom processor could keep entries in its WindowStore
> for as log as needed, depending on the activity of a particular input
> key...
>
> >
> > Regards, Peter
> >
> >
>
>

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by Peter Levart <pe...@gmail.com>.

On 1/8/19 12:57 PM, Peter Levart wrote:
> Hi John,
>
> On 1/8/19 12:45 PM, Peter Levart wrote:
>>> I looked at your custom transfomer, and it looks almost correct to 
>>> me. The
>>> only flaw seems to be that it only looks
>>> for closed windows for the key currently being processed, which 
>>> means that
>>> if you have key "A" buffered, but don't get another event for it for a
>>> while after the window closes, you won't emit the final result. This 
>>> might
>>> actually take longer than the window retention period, in which 
>>> case, the
>>> data would be deleted without ever emitting the final result.
>>
>> So in DSL case, the suppression works by flushing *all* of the "ripe" 
>> windows in the whole buffer whenever a singe event comes in with 
>> recent enough timestamp regardless of the key of that event?
>>
>> Is the buffer shared among processing tasks or does each task 
>> maintain its own private buffer that only contains its share of data 
>> pertaining to assigned input partitions? In case the tasks are 
>> executed on several processing JVM(s) the buffer can't really be 
>> shared, right? In that case a single event can't flush all of the 
>> "ripe" windows, but just those that are contained in the task's part 
>> of buffer... 
>
> Just a question about your comment above:
>
> /"This might actually take longer than the window retention period, in 
> which case, the data would be deleted without ever emitting the final 
> result"/
>
> Are you talking about the buffer log topic retention? Aren't log 
> topics configured to "compact" rather than "delete" messages? So the 
> last "version" of the buffer entry for a particular key should stay 
> forever? What are the keys in suppression buffer log topic? Are they a 
> pair of (timestamp, key) ? Probably not since in that case the 
> compacted log would grow indefinitely...
>
> Another question:
>
> What are the keys in WindowStore's log topic? If the input keys to the 
> processor that uses such WindowStore consist of a bounded set of 
> values (for example user ids), would compacted log of such WindowStore 
> also be bounded?

In case the key of WindowStore log topic is (timestamp, key) then would 
explicitly deleting flushed entries from WindowStore (by putting null 
value into the store) keep the compacted log bounded? In other words, 
does WindowStore log topic support a special kind of "tombstone" message 
that effectively removes the key from the compacted log?

In that case, my custom processor could keep entries in its WindowStore 
for as log as needed, depending on the activity of a particular input key...

>
> Regards, Peter
>
>


Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by Peter Levart <pe...@gmail.com>.
Hi John,

On 1/8/19 12:45 PM, Peter Levart wrote:
>> I looked at your custom transfomer, and it looks almost correct to 
>> me. The
>> only flaw seems to be that it only looks
>> for closed windows for the key currently being processed, which means 
>> that
>> if you have key "A" buffered, but don't get another event for it for a
>> while after the window closes, you won't emit the final result. This 
>> might
>> actually take longer than the window retention period, in which case, 
>> the
>> data would be deleted without ever emitting the final result.
>
> So in DSL case, the suppression works by flushing *all* of the "ripe" 
> windows in the whole buffer whenever a singe event comes in with 
> recent enough timestamp regardless of the key of that event?
>
> Is the buffer shared among processing tasks or does each task maintain 
> its own private buffer that only contains its share of data pertaining 
> to assigned input partitions? In case the tasks are executed on 
> several processing JVM(s) the buffer can't really be shared, right? In 
> that case a single event can't flush all of the "ripe" windows, but 
> just those that are contained in the task's part of buffer... 

Just a question about your comment above:

/"This might actually take longer than the window retention period, in 
which case, the data would be deleted without ever emitting the final 
result"/

Are you talking about the buffer log topic retention? Aren't log topics 
configured to "compact" rather than "delete" messages? So the last 
"version" of the buffer entry for a particular key should stay forever? 
What are the keys in suppression buffer log topic? Are they a pair of 
(timestamp, key) ? Probably not since in that case the compacted log 
would grow indefinitely...

Another question:

What are the keys in WindowStore's log topic? If the input keys to the 
processor that uses such WindowStore consist of a bounded set of values 
(for example user ids), would compacted log of such WindowStore also be 
bounded?

Regards, Peter


Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

Posted by Peter Levart <pe...@gmail.com>.
Hi John,

On 1/7/19 9:10 PM, John Roesler wrote:
> Hi Peter,
>
> Sorry, I just now have seen this thread.
>
> You asked if this behavior is unexpected, and the answer is yes.
> Suppress.untilWindowCloses is intended to emit only the final result,
> regardless of restarts.
>
> You also asked how the suppression buffer can resume after a restart, since
> it's not persistent.
> The answer is the same as for in-memory stores. The state of the store (or
> buffer, in this case)
> is persisted to a changelog topic, which is re-read on restart to re-create
> the exact state prior to shutdown.
> "Persistent" in the store nomenclature refers only to "persistent on the
> local disk".
>
> Just to confirm your response regarding the buffer size:
> While it is better to use the public ("Suppressed.unbounded()") API, yes,
> your buffer was already unbounded.
>
> I looked at your custom transfomer, and it looks almost correct to me. The
> only flaw seems to be that it only looks
> for closed windows for the key currently being processed, which means that
> if you have key "A" buffered, but don't get another event for it for a
> while after the window closes, you won't emit the final result. This might
> actually take longer than the window retention period, in which case, the
> data would be deleted without ever emitting the final result.

So in DSL case, the suppression works by flushing *all* of the "ripe" 
windows in the whole buffer whenever a singe event comes in with recent 
enough timestamp regardless of the key of that event?

Is the buffer shared among processing tasks or does each task maintain 
its own private buffer that only contains its share of data pertaining 
to assigned input partitions? In case the tasks are executed on several 
processing JVM(s) the buffer can't really be shared, right? In that case 
a single event can't flush all of the "ripe" windows, but just those 
that are contained in the task's part of buffer...

>
> You said you think it should be possible to get the DSL version working,
> and I agree, since this is exactly what it was designed for. Do you mind
> filing a bug in the "KAFKA" Jira project (
> https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to
> keep the investigation organized that way.

Will do that.

>
> In the mean time, I'll take another look at your logs above and try to
> reason about what could be wrong.
>
> Just one clarification... For example, you showed
>> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
> 272, 548, 172], sum: 138902
>> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
> 272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164
>
> Am I correct in thinking that the first, shorter list is the "incremental"
> version, and the second is the "final" version? I think so, but am confused
> by "INSTEAD OF".

It's the other way around. The 1st list (usually the longer one) is what 
has just been consumed and the second is what had been consumed before 
that for the same key (I maintain a ConcurrentHashMap of consumed 
entries in the test and execute: secondList = map.put(key, firstList) ....

In majority of cases, the consumed list is an incremental update of some 
previous version of the list (not necessarily direct descendant) 
consumed before that, but as said, I also observed the final window 
result before processor restart and after restart some previous version 
of non-final window aggregation for the same key.

May I also note that there is some "jitter" in the input timestamps 
because I'm trying to model a real usecase where there will be several 
input(s) to the system with only approximately synchronized clocks. The 
jitter is kept well below the TimeWindow grace period so there should be 
no events consumed by the processor that belong to windows that have 
already been flushed.

Regards, Peter

>
> Thanks for the report,
> -John
>
>
>
> On Wed, Dec 26, 2018 at 3:21 AM Peter Levart <pe...@gmail.com> wrote:
>
>>
>> On 12/21/18 3:16 PM, Peter Levart wrote:
>>> I also see some results that are actual non-final window aggregations
>>> that precede the final aggregations. These non-final results are never
>>> emitted out of order (for example, no such non-final result would ever
>>> come after the final result for a particular key/window).
>> Absence of proof is not the proof of absence... And I have later
>> observed (using the DSL variant, not the custom Transformer) an
>> occurrence of a non-final result that was emited after restart of
>> streams processor while the final result for the same key/window had
>> been emitted before the restart:
>>
>> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550,
>> 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856
>> ...
>> ... restart ...
>> ...
>> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550]
>> INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648
>>
>>
>> The app logic can not even rely on guarantee that results are ordered
>> then. This is really not usable until the bug is fixed.
>>
>> Regards, Peter
>>
>>