You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dmitry Minkovsky <dm...@gmail.com> on 2018/01/16 23:08:00 UTC

Kafka Streams topology does not replay correctly

Earlier today I posted this question to SO
<https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly>
:

> I have a topology that looks like this:

    KTable<ByteString, User> users = topology.table(USERS,
Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));

    KStream<ByteString, JoinRequest> joinRequests =
topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
joinRequestSerde))
        .mapValues(entityTopologyProcessor::userNew)
        .to(USERS, Produced.with(byteStringSerde, userSerde));

    topology.stream(SETTINGS_CONFIRM_REQUESTS,
Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
        .join(users, entityTopologyProcessor::userSettingsConfirm,
Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
        .to(USERS, Produced.with(byteStringSerde, userSerde));

    topology.stream(SETTINGS_UPDATE_REQUESTS,
Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
        .join(users, entityTopologyProcessor::userSettingsUpdate,
Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
        .to(USERS, Produced.with(byteStringSerde, userSerde));

> At runtime this topology works fine. Users are created with join
requests. They confirm their settings with settings confirm requests. They
update their settings with settings update requests.
>
> However, reprocessing this topology does not produce the original
results. Specifically, the settings update joiner does not see the user
that resulted from the settings confirm joiner, even though in terms of
timestamps, many seconds elapse from the time the user is created, to the
time the user is confirmed to the time the user updates their settings.
>
> I'm at a loss. I've tried turning off caching/logging on the user table.
No idea what to do to make this reprocess properly.

----

The response by Matthias, also on SO:

> A KStream-KTable join is not 100% deterministic (and might never become
100% deterministic). We are aware of the problem and discuss solutions, to
at least mitigate the issue.
>
> One problem is, that if a Consumer fetches from the brokers, we cannot
control easily for which topics and/or partitions the broker returns data.
And depending on the order in which we receive data from the broker, the
result might slightly differ.
>
> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
>
> This blog post might help, too:
https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

----

I don't really know what to do with this response. I have been aware of
some "slight" discrepancy that might occur in edge cases with
KStream-KTable joins for some time now, but what I'm seeing is not a slight
discrepancy but very different results.

I looked at the JIRA Matthias linked
<https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data has no
late arriving records. I don't know about the empty buffers. I have read
the blog post he linked several times already.

Can someone please suggest how I may obviate this problem? For example

   - Would it make sense for me to try launching the topology with fewer
   threads during the reprocess?
   - Would it make sense for launch the topology with fewer input tasks?
   - Would it make sense to increase size of the stream buffer?

I am at a total loss at this point. I cannot believe that there is nothing
I can do to replay this data and perform the migration I am trying to
perform, in order to release a next version of my application. Am I totally
screwed?


Thank you,
Dmitry

Re: Kafka Streams topology does not replay correctly

Posted by Dmitry Minkovsky <dm...@gmail.com>.
> however the question remains, what to do when one partition does not
have any data (ie, your are in the real-time case processing at the head
of the topics)? Just process (with the risk that we get the order
wrong?) or block until we get data for all partitions (But for how long
to block/delay? -- Blocking could have large impact on overall behavior).

Yes, these are the very issues I am facing in attempting to fix my
topology.

Here is my topology, if you are wondering: https://gist.github.com/
dminkovsky/75648a44aecafcf95e679ed8bd780aca. It powers https://www.pony.gg.
You see the crux of every single transformation is taking event input
streams and joining them against some accumulated state. This is fine in
real time, but everything needs to be synchronized to produce correct
results on reprocess.

Since I'm not in a position to hack on the framework itself, I've been
attempting to re-write my topology to synchronize streams using the API.
It's been sort of a nightmare for the reasons you cite above.

My conclusion has been that a huge gap in foresight lead me to modeling
incorrectly with this toolset. If I wanted my data to be easily
re-processable, I would have put all relevant events in order onto
a  single topic-partition. Then there would be no need for input
stream/accumulated state synchronization upon reprocess. Of course then my
topology would not look anywhere as neat as it does today. But it turns out
that my topology as it looks today is basically worthless.

What a conundrum!


Dmitry










On Wed, Jan 17, 2018 at 1:48 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> We will fix this. No worries. And it is basically only a problem for
> Stream-Table join...
>
> All other operators don't "suffer" from the current behavior.
>
> I think in the end, we need to do multiple poll() calls and pause
> partitions to make sure the internal RecordQueue has data for all input
> partitions -- this allows us to pick the next record to be processed
> deterministic over all topics -- for reprocessing, this solve the issue
> -- however the question remains, what to do when one partition does not
> have any data (ie, your are in the real-time case processing at the head
> of the topics)? Just process (with the risk that we get the order
> wrong?) or block until we get data for all partitions (But for how long
> to block/delay? -- Blocking could have large impact on overall behavior).
>
> The detail are still under discussion... Hope this helps.
>
>
> -Matthias
>
> On 1/17/18 9:10 AM, Dmitry Minkovsky wrote:
> > I have read through the source, following StreamThread ->
> > AssignedStreamTasks -> StreamTask -> PartitionGroup -> RecordQueue. I
> see,
> > as you said, that everything depends on what records the consumer
> initially
> > returns from poll.
> >
> > I wonder how this problem might be solved in the bigger picture.
> Certainly
> > I'm not the only person who is interested in reprocessing their history.
> > For me that was in the top 3 coolest things about the stream-based
> approach
> > to modeling an application. If you can't reprocess your application, it's
> > almost not worth modeling it this way.
> >
> > Now, certainly I will find ways to make adjustments to make this
> > re-processable. But it would be cool if this was a first-class things in
> > the framework. It seems like it would require the user to specify the
> > runtime dependencies between streams.
> >
> > On Wed, Jan 17, 2018 at 8:25 AM, Dmitry Minkovsky <dm...@gmail.com>
> > wrote:
> >
> >>> That depends what data the consumer fetches and this part is hard to
> >> predict. For this reason, you need to buffer multiple records in a
> >> store, in case data does not arrive in the order and you need it
> >> (between different topics) and later do the processing in the correct
> >> order when you got all data you need. Does this make sense?
> >>
> >> I understand. Thanks for the explanation. That’s what I concluded when I
> >> was wondering you were talking about buffers. All this time I though the
> >> StreamThread did this to some extent. Fundamental misconception on my
> part.
> >> So the “best effort” synchronization doesn’t apply at all across
> >> independent streams? Only applies in the case of joins? Does it apply in
> >> the case of merge?
> >>
> >> Thank you,
> >> Dmitry
> >>
> >>
> >> ср, 17 янв. 2018 г. в 2:39, Matthias J. Sax <ma...@confluent.io>:
> >>
> >>>>>> The KStream has incoming events, and #transform() will
> >>>>>> let me mount the store and use it how I please. Within an
> application
> >>>>>> instance, any other KStream#transform()s using the same store will
> >>> see the
> >>>>>> same data in real time.
> >>>
> >>> That sounds basically correct. But you don't know the order (between
> >>> different topics) in which you will receive the data.
> >>>
> >>>>>> Will the topology call the join transform before the
> settings-confirm
> >>>>>> transform before the settings-update transform?
> >>>
> >>> That depends what data the consumer fetches and this part is hard to
> >>> predict. For this reason, you need to buffer multiple records in a
> >>> store, in case data does not arrive in the order and you need it
> >>> (between different topics) and later do the processing in the correct
> >>> order when you got all data you need. Does this make sense?
> >>>
> >>> This is the underlying problem for KStream-KTable join, too. If might
> >>> happen hat we get 100 KTable records that we all process before we
> >>> receive 100 KStream records. For the correct result it might be
> required
> >>> to get 50 KTable and 50 KStream in the first poll call and the rest in
> >>> the second. But we don't know and just process whatever we get.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 1/16/18 7:14 PM, Dmitry Minkovsky wrote:
> >>>> I meant “Thanks, yes I will try replacing...”
> >>>>
> >>>> вт, 16 янв. 2018 г. в 22:12, Dmitry Minkovsky <dm...@gmail.com>:
> >>>>
> >>>>> Thanks, yes try replacing the KStream-KTable joins with
> >>>>> KStream#transform()s and a store. Not sure why you mean I’d need to
> >>> buffer
> >>>>> multiple records. The KStream has incoming events, and #transform()
> >>> will
> >>>>> let me mount the store and use it how I please. Within an application
> >>>>> instance, any other KStream#transform()s using the same store will
> see
> >>> the
> >>>>> same data in real time.
> >>>>>
> >>>>> Now suppose I have three topics, each with events like this, each on
> >>> their
> >>>>> own KStream:
> >>>>>
> >>>>> T1 join
> >>>>> T2 settings-confirm
> >>>>> T3 settings-update
> >>>>>
> >>>>> Will the topology call the join transform before the settings-confirm
> >>>>> transform before the settings-update transform?
> >>>>>
> >>>>>
> >>>>>
> >>>>> вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <matthias@confluent.io
> >:
> >>>>>
> >>>>>> You have more flexibility of course and thus can get better results.
> >>> But
> >>>>>> your code must be able to buffer multiple records from the KTable
> and
> >>>>>> KStream input and also store the corresponding timestamps to perform
> >>> the
> >>>>>> join correctly. It's not trivial but also also not rocket-science.
> >>>>>>
> >>>>>> If we need stronger guarantees, it's the best way to follow though
> >>> atm,
> >>>>>> until we have addressed those issues. Planned for 1.2.0 release.
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
> >>>>>>> Right now I am thinking of re-writing anything that has these
> >>>>>> problematic
> >>>>>>> KStream/KTable joins as KStream#transform() wherein the state store
> >>> is
> >>>>>>> manually used. Does that makes sense as an option for me?
> >>>>>>>
> >>>>>>> -Dmitry
> >>>>>>>
> >>>>>>> On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <
> >>> dminkovsky@gmail.com
> >>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Earlier today I posted this question to SO
> >>>>>>>> <
> >>>>>> https://stackoverflow.com/questions/48287840/kafka-
> >>> streams-topology-does-not-replay-correctly
> >>>>>>>
> >>>>>>>> :
> >>>>>>>>
> >>>>>>>>> I have a topology that looks like this:
> >>>>>>>>
> >>>>>>>>     KTable<ByteString, User> users = topology.table(USERS,
> >>>>>>>> Consumed.with(byteStringSerde, userSerde),
> Materialized.as(USERS));
> >>>>>>>>
> >>>>>>>>     KStream<ByteString, JoinRequest> joinRequests =
> >>>>>>>> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
> >>>>>>>> joinRequestSerde))
> >>>>>>>>         .mapValues(entityTopologyProcessor::userNew)
> >>>>>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>>>>>>>
> >>>>>>>>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
> >>>>>>>> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
> >>>>>>>>         .join(users, entityTopologyProcessor::
> userSettingsConfirm,
> >>>>>>>> Joined.with(byteStringSerde, settingsConfirmRequestSerde,
> >>> userSerde))
> >>>>>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>>>>>>>
> >>>>>>>>     topology.stream(SETTINGS_UPDATE_REQUESTS,
> >>>>>>>> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
> >>>>>>>>         .join(users, entityTopologyProcessor::userSettingsUpdate,
> >>>>>>>> Joined.with(byteStringSerde, settingsUpdateRequestSerde,
> userSerde))
> >>>>>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>>>>>>>
> >>>>>>>>> At runtime this topology works fine. Users are created with join
> >>>>>>>> requests. They confirm their settings with settings confirm
> >>> requests.
> >>>>>> They
> >>>>>>>> update their settings with settings update requests.
> >>>>>>>>>
> >>>>>>>>> However, reprocessing this topology does not produce the original
> >>>>>>>> results. Specifically, the settings update joiner does not see the
> >>> user
> >>>>>>>> that resulted from the settings confirm joiner, even though in
> >>> terms of
> >>>>>>>> timestamps, many seconds elapse from the time the user is created,
> >>> to
> >>>>>> the
> >>>>>>>> time the user is confirmed to the time the user updates their
> >>> settings.
> >>>>>>>>>
> >>>>>>>>> I'm at a loss. I've tried turning off caching/logging on the user
> >>>>>> table.
> >>>>>>>> No idea what to do to make this reprocess properly.
> >>>>>>>>
> >>>>>>>> ----
> >>>>>>>>
> >>>>>>>> The response by Matthias, also on SO:
> >>>>>>>>
> >>>>>>>>> A KStream-KTable join is not 100% deterministic (and might never
> >>>>>> become
> >>>>>>>> 100% deterministic). We are aware of the problem and discuss
> >>>>>> solutions, to
> >>>>>>>> at least mitigate the issue.
> >>>>>>>>>
> >>>>>>>>> One problem is, that if a Consumer fetches from the brokers, we
> >>> cannot
> >>>>>>>> control easily for which topics and/or partitions the broker
> returns
> >>>>>> data.
> >>>>>>>> And depending on the order in which we receive data from the
> broker,
> >>>>>> the
> >>>>>>>> result might slightly differ.
> >>>>>>>>>
> >>>>>>>>> One related issue: https://issues.apache.org/
> >>> jira/browse/KAFKA-3514
> >>>>>>>>>
> >>>>>>>>> This blog post might help, too: https://www.confluent.io/blog/
> >>>>>>>> crossing-streams-joins-apache-kafka/
> >>>>>>>>
> >>>>>>>> ----
> >>>>>>>>
> >>>>>>>> I don't really know what to do with this response. I have been
> >>> aware of
> >>>>>>>> some "slight" discrepancy that might occur in edge cases with
> >>>>>>>> KStream-KTable joins for some time now, but what I'm seeing is
> not a
> >>>>>> slight
> >>>>>>>> discrepancy but very different results.
> >>>>>>>>
> >>>>>>>> I looked at the JIRA Matthias linked
> >>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my
> >>> data
> >>>>>> has
> >>>>>>>> no late arriving records. I don't know about the empty buffers. I
> >>> have
> >>>>>> read
> >>>>>>>> the blog post he linked several times already.
> >>>>>>>>
> >>>>>>>> Can someone please suggest how I may obviate this problem? For
> >>> example
> >>>>>>>>
> >>>>>>>>    - Would it make sense for me to try launching the topology with
> >>>>>> fewer
> >>>>>>>>    threads during the reprocess?
> >>>>>>>>    - Would it make sense for launch the topology with fewer input
> >>>>>> tasks?
> >>>>>>>>    - Would it make sense to increase size of the stream buffer?
> >>>>>>>>
> >>>>>>>> I am at a total loss at this point. I cannot believe that there is
> >>>>>> nothing
> >>>>>>>> I can do to replay this data and perform the migration I am trying
> >>> to
> >>>>>>>> perform, in order to release a next version of my application. Am
> I
> >>>>>> totally
> >>>>>>>> screwed?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thank you,
> >>>>>>>> Dmitry
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>
> >>>
> >
>
>

Re: Kafka Streams topology does not replay correctly

Posted by "Matthias J. Sax" <ma...@confluent.io>.
We will fix this. No worries. And it is basically only a problem for
Stream-Table join...

All other operators don't "suffer" from the current behavior.

I think in the end, we need to do multiple poll() calls and pause
partitions to make sure the internal RecordQueue has data for all input
partitions -- this allows us to pick the next record to be processed
deterministic over all topics -- for reprocessing, this solve the issue
-- however the question remains, what to do when one partition does not
have any data (ie, your are in the real-time case processing at the head
of the topics)? Just process (with the risk that we get the order
wrong?) or block until we get data for all partitions (But for how long
to block/delay? -- Blocking could have large impact on overall behavior).

The detail are still under discussion... Hope this helps.


-Matthias

On 1/17/18 9:10 AM, Dmitry Minkovsky wrote:
> I have read through the source, following StreamThread ->
> AssignedStreamTasks -> StreamTask -> PartitionGroup -> RecordQueue. I see,
> as you said, that everything depends on what records the consumer initially
> returns from poll.
> 
> I wonder how this problem might be solved in the bigger picture. Certainly
> I'm not the only person who is interested in reprocessing their history.
> For me that was in the top 3 coolest things about the stream-based approach
> to modeling an application. If you can't reprocess your application, it's
> almost not worth modeling it this way.
> 
> Now, certainly I will find ways to make adjustments to make this
> re-processable. But it would be cool if this was a first-class things in
> the framework. It seems like it would require the user to specify the
> runtime dependencies between streams.
> 
> On Wed, Jan 17, 2018 at 8:25 AM, Dmitry Minkovsky <dm...@gmail.com>
> wrote:
> 
>>> That depends what data the consumer fetches and this part is hard to
>> predict. For this reason, you need to buffer multiple records in a
>> store, in case data does not arrive in the order and you need it
>> (between different topics) and later do the processing in the correct
>> order when you got all data you need. Does this make sense?
>>
>> I understand. Thanks for the explanation. That’s what I concluded when I
>> was wondering you were talking about buffers. All this time I though the
>> StreamThread did this to some extent. Fundamental misconception on my part.
>> So the “best effort” synchronization doesn’t apply at all across
>> independent streams? Only applies in the case of joins? Does it apply in
>> the case of merge?
>>
>> Thank you,
>> Dmitry
>>
>>
>> ср, 17 янв. 2018 г. в 2:39, Matthias J. Sax <ma...@confluent.io>:
>>
>>>>>> The KStream has incoming events, and #transform() will
>>>>>> let me mount the store and use it how I please. Within an application
>>>>>> instance, any other KStream#transform()s using the same store will
>>> see the
>>>>>> same data in real time.
>>>
>>> That sounds basically correct. But you don't know the order (between
>>> different topics) in which you will receive the data.
>>>
>>>>>> Will the topology call the join transform before the settings-confirm
>>>>>> transform before the settings-update transform?
>>>
>>> That depends what data the consumer fetches and this part is hard to
>>> predict. For this reason, you need to buffer multiple records in a
>>> store, in case data does not arrive in the order and you need it
>>> (between different topics) and later do the processing in the correct
>>> order when you got all data you need. Does this make sense?
>>>
>>> This is the underlying problem for KStream-KTable join, too. If might
>>> happen hat we get 100 KTable records that we all process before we
>>> receive 100 KStream records. For the correct result it might be required
>>> to get 50 KTable and 50 KStream in the first poll call and the rest in
>>> the second. But we don't know and just process whatever we get.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 1/16/18 7:14 PM, Dmitry Minkovsky wrote:
>>>> I meant “Thanks, yes I will try replacing...”
>>>>
>>>> вт, 16 янв. 2018 г. в 22:12, Dmitry Minkovsky <dm...@gmail.com>:
>>>>
>>>>> Thanks, yes try replacing the KStream-KTable joins with
>>>>> KStream#transform()s and a store. Not sure why you mean I’d need to
>>> buffer
>>>>> multiple records. The KStream has incoming events, and #transform()
>>> will
>>>>> let me mount the store and use it how I please. Within an application
>>>>> instance, any other KStream#transform()s using the same store will see
>>> the
>>>>> same data in real time.
>>>>>
>>>>> Now suppose I have three topics, each with events like this, each on
>>> their
>>>>> own KStream:
>>>>>
>>>>> T1 join
>>>>> T2 settings-confirm
>>>>> T3 settings-update
>>>>>
>>>>> Will the topology call the join transform before the settings-confirm
>>>>> transform before the settings-update transform?
>>>>>
>>>>>
>>>>>
>>>>> вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <ma...@confluent.io>:
>>>>>
>>>>>> You have more flexibility of course and thus can get better results.
>>> But
>>>>>> your code must be able to buffer multiple records from the KTable and
>>>>>> KStream input and also store the corresponding timestamps to perform
>>> the
>>>>>> join correctly. It's not trivial but also also not rocket-science.
>>>>>>
>>>>>> If we need stronger guarantees, it's the best way to follow though
>>> atm,
>>>>>> until we have addressed those issues. Planned for 1.2.0 release.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
>>>>>>> Right now I am thinking of re-writing anything that has these
>>>>>> problematic
>>>>>>> KStream/KTable joins as KStream#transform() wherein the state store
>>> is
>>>>>>> manually used. Does that makes sense as an option for me?
>>>>>>>
>>>>>>> -Dmitry
>>>>>>>
>>>>>>> On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <
>>> dminkovsky@gmail.com
>>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Earlier today I posted this question to SO
>>>>>>>> <
>>>>>> https://stackoverflow.com/questions/48287840/kafka-
>>> streams-topology-does-not-replay-correctly
>>>>>>>
>>>>>>>> :
>>>>>>>>
>>>>>>>>> I have a topology that looks like this:
>>>>>>>>
>>>>>>>>     KTable<ByteString, User> users = topology.table(USERS,
>>>>>>>> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>>>>>>>>
>>>>>>>>     KStream<ByteString, JoinRequest> joinRequests =
>>>>>>>> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
>>>>>>>> joinRequestSerde))
>>>>>>>>         .mapValues(entityTopologyProcessor::userNew)
>>>>>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>>>>>>>
>>>>>>>>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
>>>>>>>> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
>>>>>>>>         .join(users, entityTopologyProcessor::userSettingsConfirm,
>>>>>>>> Joined.with(byteStringSerde, settingsConfirmRequestSerde,
>>> userSerde))
>>>>>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>>>>>>>
>>>>>>>>     topology.stream(SETTINGS_UPDATE_REQUESTS,
>>>>>>>> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
>>>>>>>>         .join(users, entityTopologyProcessor::userSettingsUpdate,
>>>>>>>> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
>>>>>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>>>>>>>
>>>>>>>>> At runtime this topology works fine. Users are created with join
>>>>>>>> requests. They confirm their settings with settings confirm
>>> requests.
>>>>>> They
>>>>>>>> update their settings with settings update requests.
>>>>>>>>>
>>>>>>>>> However, reprocessing this topology does not produce the original
>>>>>>>> results. Specifically, the settings update joiner does not see the
>>> user
>>>>>>>> that resulted from the settings confirm joiner, even though in
>>> terms of
>>>>>>>> timestamps, many seconds elapse from the time the user is created,
>>> to
>>>>>> the
>>>>>>>> time the user is confirmed to the time the user updates their
>>> settings.
>>>>>>>>>
>>>>>>>>> I'm at a loss. I've tried turning off caching/logging on the user
>>>>>> table.
>>>>>>>> No idea what to do to make this reprocess properly.
>>>>>>>>
>>>>>>>> ----
>>>>>>>>
>>>>>>>> The response by Matthias, also on SO:
>>>>>>>>
>>>>>>>>> A KStream-KTable join is not 100% deterministic (and might never
>>>>>> become
>>>>>>>> 100% deterministic). We are aware of the problem and discuss
>>>>>> solutions, to
>>>>>>>> at least mitigate the issue.
>>>>>>>>>
>>>>>>>>> One problem is, that if a Consumer fetches from the brokers, we
>>> cannot
>>>>>>>> control easily for which topics and/or partitions the broker returns
>>>>>> data.
>>>>>>>> And depending on the order in which we receive data from the broker,
>>>>>> the
>>>>>>>> result might slightly differ.
>>>>>>>>>
>>>>>>>>> One related issue: https://issues.apache.org/
>>> jira/browse/KAFKA-3514
>>>>>>>>>
>>>>>>>>> This blog post might help, too: https://www.confluent.io/blog/
>>>>>>>> crossing-streams-joins-apache-kafka/
>>>>>>>>
>>>>>>>> ----
>>>>>>>>
>>>>>>>> I don't really know what to do with this response. I have been
>>> aware of
>>>>>>>> some "slight" discrepancy that might occur in edge cases with
>>>>>>>> KStream-KTable joins for some time now, but what I'm seeing is not a
>>>>>> slight
>>>>>>>> discrepancy but very different results.
>>>>>>>>
>>>>>>>> I looked at the JIRA Matthias linked
>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my
>>> data
>>>>>> has
>>>>>>>> no late arriving records. I don't know about the empty buffers. I
>>> have
>>>>>> read
>>>>>>>> the blog post he linked several times already.
>>>>>>>>
>>>>>>>> Can someone please suggest how I may obviate this problem? For
>>> example
>>>>>>>>
>>>>>>>>    - Would it make sense for me to try launching the topology with
>>>>>> fewer
>>>>>>>>    threads during the reprocess?
>>>>>>>>    - Would it make sense for launch the topology with fewer input
>>>>>> tasks?
>>>>>>>>    - Would it make sense to increase size of the stream buffer?
>>>>>>>>
>>>>>>>> I am at a total loss at this point. I cannot believe that there is
>>>>>> nothing
>>>>>>>> I can do to replay this data and perform the migration I am trying
>>> to
>>>>>>>> perform, in order to release a next version of my application. Am I
>>>>>> totally
>>>>>>>> screwed?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thank you,
>>>>>>>> Dmitry
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>>
> 


Re: Kafka Streams topology does not replay correctly

Posted by Dmitry Minkovsky <dm...@gmail.com>.
I have read through the source, following StreamThread ->
AssignedStreamTasks -> StreamTask -> PartitionGroup -> RecordQueue. I see,
as you said, that everything depends on what records the consumer initially
returns from poll.

I wonder how this problem might be solved in the bigger picture. Certainly
I'm not the only person who is interested in reprocessing their history.
For me that was in the top 3 coolest things about the stream-based approach
to modeling an application. If you can't reprocess your application, it's
almost not worth modeling it this way.

Now, certainly I will find ways to make adjustments to make this
re-processable. But it would be cool if this was a first-class things in
the framework. It seems like it would require the user to specify the
runtime dependencies between streams.

On Wed, Jan 17, 2018 at 8:25 AM, Dmitry Minkovsky <dm...@gmail.com>
wrote:

> > That depends what data the consumer fetches and this part is hard to
> predict. For this reason, you need to buffer multiple records in a
> store, in case data does not arrive in the order and you need it
> (between different topics) and later do the processing in the correct
> order when you got all data you need. Does this make sense?
>
> I understand. Thanks for the explanation. That’s what I concluded when I
> was wondering you were talking about buffers. All this time I though the
> StreamThread did this to some extent. Fundamental misconception on my part.
> So the “best effort” synchronization doesn’t apply at all across
> independent streams? Only applies in the case of joins? Does it apply in
> the case of merge?
>
> Thank you,
> Dmitry
>
>
> ср, 17 янв. 2018 г. в 2:39, Matthias J. Sax <ma...@confluent.io>:
>
>> >>> The KStream has incoming events, and #transform() will
>> >>> let me mount the store and use it how I please. Within an application
>> >>> instance, any other KStream#transform()s using the same store will
>> see the
>> >>> same data in real time.
>>
>> That sounds basically correct. But you don't know the order (between
>> different topics) in which you will receive the data.
>>
>> >>> Will the topology call the join transform before the settings-confirm
>> >>> transform before the settings-update transform?
>>
>> That depends what data the consumer fetches and this part is hard to
>> predict. For this reason, you need to buffer multiple records in a
>> store, in case data does not arrive in the order and you need it
>> (between different topics) and later do the processing in the correct
>> order when you got all data you need. Does this make sense?
>>
>> This is the underlying problem for KStream-KTable join, too. If might
>> happen hat we get 100 KTable records that we all process before we
>> receive 100 KStream records. For the correct result it might be required
>> to get 50 KTable and 50 KStream in the first poll call and the rest in
>> the second. But we don't know and just process whatever we get.
>>
>>
>> -Matthias
>>
>>
>> On 1/16/18 7:14 PM, Dmitry Minkovsky wrote:
>> > I meant “Thanks, yes I will try replacing...”
>> >
>> > вт, 16 янв. 2018 г. в 22:12, Dmitry Minkovsky <dm...@gmail.com>:
>> >
>> >> Thanks, yes try replacing the KStream-KTable joins with
>> >> KStream#transform()s and a store. Not sure why you mean I’d need to
>> buffer
>> >> multiple records. The KStream has incoming events, and #transform()
>> will
>> >> let me mount the store and use it how I please. Within an application
>> >> instance, any other KStream#transform()s using the same store will see
>> the
>> >> same data in real time.
>> >>
>> >> Now suppose I have three topics, each with events like this, each on
>> their
>> >> own KStream:
>> >>
>> >> T1 join
>> >> T2 settings-confirm
>> >> T3 settings-update
>> >>
>> >> Will the topology call the join transform before the settings-confirm
>> >> transform before the settings-update transform?
>> >>
>> >>
>> >>
>> >> вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <ma...@confluent.io>:
>> >>
>> >>> You have more flexibility of course and thus can get better results.
>> But
>> >>> your code must be able to buffer multiple records from the KTable and
>> >>> KStream input and also store the corresponding timestamps to perform
>> the
>> >>> join correctly. It's not trivial but also also not rocket-science.
>> >>>
>> >>> If we need stronger guarantees, it's the best way to follow though
>> atm,
>> >>> until we have addressed those issues. Planned for 1.2.0 release.
>> >>>
>> >>> -Matthias
>> >>>
>> >>>
>> >>> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
>> >>>> Right now I am thinking of re-writing anything that has these
>> >>> problematic
>> >>>> KStream/KTable joins as KStream#transform() wherein the state store
>> is
>> >>>> manually used. Does that makes sense as an option for me?
>> >>>>
>> >>>> -Dmitry
>> >>>>
>> >>>> On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <
>> dminkovsky@gmail.com
>> >>>>
>> >>>> wrote:
>> >>>>
>> >>>>> Earlier today I posted this question to SO
>> >>>>> <
>> >>> https://stackoverflow.com/questions/48287840/kafka-
>> streams-topology-does-not-replay-correctly
>> >>>>
>> >>>>> :
>> >>>>>
>> >>>>>> I have a topology that looks like this:
>> >>>>>
>> >>>>>     KTable<ByteString, User> users = topology.table(USERS,
>> >>>>> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>> >>>>>
>> >>>>>     KStream<ByteString, JoinRequest> joinRequests =
>> >>>>> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
>> >>>>> joinRequestSerde))
>> >>>>>         .mapValues(entityTopologyProcessor::userNew)
>> >>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>>>>
>> >>>>>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
>> >>>>> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
>> >>>>>         .join(users, entityTopologyProcessor::userSettingsConfirm,
>> >>>>> Joined.with(byteStringSerde, settingsConfirmRequestSerde,
>> userSerde))
>> >>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>>>>
>> >>>>>     topology.stream(SETTINGS_UPDATE_REQUESTS,
>> >>>>> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
>> >>>>>         .join(users, entityTopologyProcessor::userSettingsUpdate,
>> >>>>> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
>> >>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>>>>
>> >>>>>> At runtime this topology works fine. Users are created with join
>> >>>>> requests. They confirm their settings with settings confirm
>> requests.
>> >>> They
>> >>>>> update their settings with settings update requests.
>> >>>>>>
>> >>>>>> However, reprocessing this topology does not produce the original
>> >>>>> results. Specifically, the settings update joiner does not see the
>> user
>> >>>>> that resulted from the settings confirm joiner, even though in
>> terms of
>> >>>>> timestamps, many seconds elapse from the time the user is created,
>> to
>> >>> the
>> >>>>> time the user is confirmed to the time the user updates their
>> settings.
>> >>>>>>
>> >>>>>> I'm at a loss. I've tried turning off caching/logging on the user
>> >>> table.
>> >>>>> No idea what to do to make this reprocess properly.
>> >>>>>
>> >>>>> ----
>> >>>>>
>> >>>>> The response by Matthias, also on SO:
>> >>>>>
>> >>>>>> A KStream-KTable join is not 100% deterministic (and might never
>> >>> become
>> >>>>> 100% deterministic). We are aware of the problem and discuss
>> >>> solutions, to
>> >>>>> at least mitigate the issue.
>> >>>>>>
>> >>>>>> One problem is, that if a Consumer fetches from the brokers, we
>> cannot
>> >>>>> control easily for which topics and/or partitions the broker returns
>> >>> data.
>> >>>>> And depending on the order in which we receive data from the broker,
>> >>> the
>> >>>>> result might slightly differ.
>> >>>>>>
>> >>>>>> One related issue: https://issues.apache.org/
>> jira/browse/KAFKA-3514
>> >>>>>>
>> >>>>>> This blog post might help, too: https://www.confluent.io/blog/
>> >>>>> crossing-streams-joins-apache-kafka/
>> >>>>>
>> >>>>> ----
>> >>>>>
>> >>>>> I don't really know what to do with this response. I have been
>> aware of
>> >>>>> some "slight" discrepancy that might occur in edge cases with
>> >>>>> KStream-KTable joins for some time now, but what I'm seeing is not a
>> >>> slight
>> >>>>> discrepancy but very different results.
>> >>>>>
>> >>>>> I looked at the JIRA Matthias linked
>> >>>>> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my
>> data
>> >>> has
>> >>>>> no late arriving records. I don't know about the empty buffers. I
>> have
>> >>> read
>> >>>>> the blog post he linked several times already.
>> >>>>>
>> >>>>> Can someone please suggest how I may obviate this problem? For
>> example
>> >>>>>
>> >>>>>    - Would it make sense for me to try launching the topology with
>> >>> fewer
>> >>>>>    threads during the reprocess?
>> >>>>>    - Would it make sense for launch the topology with fewer input
>> >>> tasks?
>> >>>>>    - Would it make sense to increase size of the stream buffer?
>> >>>>>
>> >>>>> I am at a total loss at this point. I cannot believe that there is
>> >>> nothing
>> >>>>> I can do to replay this data and perform the migration I am trying
>> to
>> >>>>> perform, in order to release a next version of my application. Am I
>> >>> totally
>> >>>>> screwed?
>> >>>>>
>> >>>>>
>> >>>>> Thank you,
>> >>>>> Dmitry
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >
>>
>>

Re: Kafka Streams topology does not replay correctly

Posted by Dmitry Minkovsky <dm...@gmail.com>.
> That depends what data the consumer fetches and this part is hard to
predict. For this reason, you need to buffer multiple records in a
store, in case data does not arrive in the order and you need it
(between different topics) and later do the processing in the correct
order when you got all data you need. Does this make sense?

I understand. Thanks for the explanation. That’s what I concluded when I
was wondering you were talking about buffers. All this time I though the
StreamThread did this to some extent. Fundamental misconception on my part.
So the “best effort” synchronization doesn’t apply at all across
independent streams? Only applies in the case of joins? Does it apply in
the case of merge?

Thank you,
Dmitry


ср, 17 янв. 2018 г. в 2:39, Matthias J. Sax <ma...@confluent.io>:

> >>> The KStream has incoming events, and #transform() will
> >>> let me mount the store and use it how I please. Within an application
> >>> instance, any other KStream#transform()s using the same store will see
> the
> >>> same data in real time.
>
> That sounds basically correct. But you don't know the order (between
> different topics) in which you will receive the data.
>
> >>> Will the topology call the join transform before the settings-confirm
> >>> transform before the settings-update transform?
>
> That depends what data the consumer fetches and this part is hard to
> predict. For this reason, you need to buffer multiple records in a
> store, in case data does not arrive in the order and you need it
> (between different topics) and later do the processing in the correct
> order when you got all data you need. Does this make sense?
>
> This is the underlying problem for KStream-KTable join, too. If might
> happen hat we get 100 KTable records that we all process before we
> receive 100 KStream records. For the correct result it might be required
> to get 50 KTable and 50 KStream in the first poll call and the rest in
> the second. But we don't know and just process whatever we get.
>
>
> -Matthias
>
>
> On 1/16/18 7:14 PM, Dmitry Minkovsky wrote:
> > I meant “Thanks, yes I will try replacing...”
> >
> > вт, 16 янв. 2018 г. в 22:12, Dmitry Minkovsky <dm...@gmail.com>:
> >
> >> Thanks, yes try replacing the KStream-KTable joins with
> >> KStream#transform()s and a store. Not sure why you mean I’d need to
> buffer
> >> multiple records. The KStream has incoming events, and #transform() will
> >> let me mount the store and use it how I please. Within an application
> >> instance, any other KStream#transform()s using the same store will see
> the
> >> same data in real time.
> >>
> >> Now suppose I have three topics, each with events like this, each on
> their
> >> own KStream:
> >>
> >> T1 join
> >> T2 settings-confirm
> >> T3 settings-update
> >>
> >> Will the topology call the join transform before the settings-confirm
> >> transform before the settings-update transform?
> >>
> >>
> >>
> >> вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <ma...@confluent.io>:
> >>
> >>> You have more flexibility of course and thus can get better results.
> But
> >>> your code must be able to buffer multiple records from the KTable and
> >>> KStream input and also store the corresponding timestamps to perform
> the
> >>> join correctly. It's not trivial but also also not rocket-science.
> >>>
> >>> If we need stronger guarantees, it's the best way to follow though atm,
> >>> until we have addressed those issues. Planned for 1.2.0 release.
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
> >>>> Right now I am thinking of re-writing anything that has these
> >>> problematic
> >>>> KStream/KTable joins as KStream#transform() wherein the state store is
> >>>> manually used. Does that makes sense as an option for me?
> >>>>
> >>>> -Dmitry
> >>>>
> >>>> On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <
> dminkovsky@gmail.com
> >>>>
> >>>> wrote:
> >>>>
> >>>>> Earlier today I posted this question to SO
> >>>>> <
> >>>
> https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly
> >>>>
> >>>>> :
> >>>>>
> >>>>>> I have a topology that looks like this:
> >>>>>
> >>>>>     KTable<ByteString, User> users = topology.table(USERS,
> >>>>> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
> >>>>>
> >>>>>     KStream<ByteString, JoinRequest> joinRequests =
> >>>>> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
> >>>>> joinRequestSerde))
> >>>>>         .mapValues(entityTopologyProcessor::userNew)
> >>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>>>>
> >>>>>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
> >>>>> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
> >>>>>         .join(users, entityTopologyProcessor::userSettingsConfirm,
> >>>>> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
> >>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>>>>
> >>>>>     topology.stream(SETTINGS_UPDATE_REQUESTS,
> >>>>> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
> >>>>>         .join(users, entityTopologyProcessor::userSettingsUpdate,
> >>>>> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
> >>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>>>>
> >>>>>> At runtime this topology works fine. Users are created with join
> >>>>> requests. They confirm their settings with settings confirm requests.
> >>> They
> >>>>> update their settings with settings update requests.
> >>>>>>
> >>>>>> However, reprocessing this topology does not produce the original
> >>>>> results. Specifically, the settings update joiner does not see the
> user
> >>>>> that resulted from the settings confirm joiner, even though in terms
> of
> >>>>> timestamps, many seconds elapse from the time the user is created, to
> >>> the
> >>>>> time the user is confirmed to the time the user updates their
> settings.
> >>>>>>
> >>>>>> I'm at a loss. I've tried turning off caching/logging on the user
> >>> table.
> >>>>> No idea what to do to make this reprocess properly.
> >>>>>
> >>>>> ----
> >>>>>
> >>>>> The response by Matthias, also on SO:
> >>>>>
> >>>>>> A KStream-KTable join is not 100% deterministic (and might never
> >>> become
> >>>>> 100% deterministic). We are aware of the problem and discuss
> >>> solutions, to
> >>>>> at least mitigate the issue.
> >>>>>>
> >>>>>> One problem is, that if a Consumer fetches from the brokers, we
> cannot
> >>>>> control easily for which topics and/or partitions the broker returns
> >>> data.
> >>>>> And depending on the order in which we receive data from the broker,
> >>> the
> >>>>> result might slightly differ.
> >>>>>>
> >>>>>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
> >>>>>>
> >>>>>> This blog post might help, too: https://www.confluent.io/blog/
> >>>>> crossing-streams-joins-apache-kafka/
> >>>>>
> >>>>> ----
> >>>>>
> >>>>> I don't really know what to do with this response. I have been aware
> of
> >>>>> some "slight" discrepancy that might occur in edge cases with
> >>>>> KStream-KTable joins for some time now, but what I'm seeing is not a
> >>> slight
> >>>>> discrepancy but very different results.
> >>>>>
> >>>>> I looked at the JIRA Matthias linked
> >>>>> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data
> >>> has
> >>>>> no late arriving records. I don't know about the empty buffers. I
> have
> >>> read
> >>>>> the blog post he linked several times already.
> >>>>>
> >>>>> Can someone please suggest how I may obviate this problem? For
> example
> >>>>>
> >>>>>    - Would it make sense for me to try launching the topology with
> >>> fewer
> >>>>>    threads during the reprocess?
> >>>>>    - Would it make sense for launch the topology with fewer input
> >>> tasks?
> >>>>>    - Would it make sense to increase size of the stream buffer?
> >>>>>
> >>>>> I am at a total loss at this point. I cannot believe that there is
> >>> nothing
> >>>>> I can do to replay this data and perform the migration I am trying to
> >>>>> perform, in order to release a next version of my application. Am I
> >>> totally
> >>>>> screwed?
> >>>>>
> >>>>>
> >>>>> Thank you,
> >>>>> Dmitry
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >
>
>

Re: Kafka Streams topology does not replay correctly

Posted by "Matthias J. Sax" <ma...@confluent.io>.
>>> The KStream has incoming events, and #transform() will
>>> let me mount the store and use it how I please. Within an application
>>> instance, any other KStream#transform()s using the same store will see the
>>> same data in real time.

That sounds basically correct. But you don't know the order (between
different topics) in which you will receive the data.

>>> Will the topology call the join transform before the settings-confirm
>>> transform before the settings-update transform?

That depends what data the consumer fetches and this part is hard to
predict. For this reason, you need to buffer multiple records in a
store, in case data does not arrive in the order and you need it
(between different topics) and later do the processing in the correct
order when you got all data you need. Does this make sense?

This is the underlying problem for KStream-KTable join, too. If might
happen hat we get 100 KTable records that we all process before we
receive 100 KStream records. For the correct result it might be required
to get 50 KTable and 50 KStream in the first poll call and the rest in
the second. But we don't know and just process whatever we get.


-Matthias


On 1/16/18 7:14 PM, Dmitry Minkovsky wrote:
> I meant “Thanks, yes I will try replacing...”
> 
> вт, 16 янв. 2018 г. в 22:12, Dmitry Minkovsky <dm...@gmail.com>:
> 
>> Thanks, yes try replacing the KStream-KTable joins with
>> KStream#transform()s and a store. Not sure why you mean I’d need to buffer
>> multiple records. The KStream has incoming events, and #transform() will
>> let me mount the store and use it how I please. Within an application
>> instance, any other KStream#transform()s using the same store will see the
>> same data in real time.
>>
>> Now suppose I have three topics, each with events like this, each on their
>> own KStream:
>>
>> T1 join
>> T2 settings-confirm
>> T3 settings-update
>>
>> Will the topology call the join transform before the settings-confirm
>> transform before the settings-update transform?
>>
>>
>>
>> вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <ma...@confluent.io>:
>>
>>> You have more flexibility of course and thus can get better results. But
>>> your code must be able to buffer multiple records from the KTable and
>>> KStream input and also store the corresponding timestamps to perform the
>>> join correctly. It's not trivial but also also not rocket-science.
>>>
>>> If we need stronger guarantees, it's the best way to follow though atm,
>>> until we have addressed those issues. Planned for 1.2.0 release.
>>>
>>> -Matthias
>>>
>>>
>>> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
>>>> Right now I am thinking of re-writing anything that has these
>>> problematic
>>>> KStream/KTable joins as KStream#transform() wherein the state store is
>>>> manually used. Does that makes sense as an option for me?
>>>>
>>>> -Dmitry
>>>>
>>>> On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dminkovsky@gmail.com
>>>>
>>>> wrote:
>>>>
>>>>> Earlier today I posted this question to SO
>>>>> <
>>> https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly
>>>>
>>>>> :
>>>>>
>>>>>> I have a topology that looks like this:
>>>>>
>>>>>     KTable<ByteString, User> users = topology.table(USERS,
>>>>> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>>>>>
>>>>>     KStream<ByteString, JoinRequest> joinRequests =
>>>>> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
>>>>> joinRequestSerde))
>>>>>         .mapValues(entityTopologyProcessor::userNew)
>>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>>>>
>>>>>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
>>>>> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
>>>>>         .join(users, entityTopologyProcessor::userSettingsConfirm,
>>>>> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
>>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>>>>
>>>>>     topology.stream(SETTINGS_UPDATE_REQUESTS,
>>>>> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
>>>>>         .join(users, entityTopologyProcessor::userSettingsUpdate,
>>>>> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
>>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>>>>
>>>>>> At runtime this topology works fine. Users are created with join
>>>>> requests. They confirm their settings with settings confirm requests.
>>> They
>>>>> update their settings with settings update requests.
>>>>>>
>>>>>> However, reprocessing this topology does not produce the original
>>>>> results. Specifically, the settings update joiner does not see the user
>>>>> that resulted from the settings confirm joiner, even though in terms of
>>>>> timestamps, many seconds elapse from the time the user is created, to
>>> the
>>>>> time the user is confirmed to the time the user updates their settings.
>>>>>>
>>>>>> I'm at a loss. I've tried turning off caching/logging on the user
>>> table.
>>>>> No idea what to do to make this reprocess properly.
>>>>>
>>>>> ----
>>>>>
>>>>> The response by Matthias, also on SO:
>>>>>
>>>>>> A KStream-KTable join is not 100% deterministic (and might never
>>> become
>>>>> 100% deterministic). We are aware of the problem and discuss
>>> solutions, to
>>>>> at least mitigate the issue.
>>>>>>
>>>>>> One problem is, that if a Consumer fetches from the brokers, we cannot
>>>>> control easily for which topics and/or partitions the broker returns
>>> data.
>>>>> And depending on the order in which we receive data from the broker,
>>> the
>>>>> result might slightly differ.
>>>>>>
>>>>>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
>>>>>>
>>>>>> This blog post might help, too: https://www.confluent.io/blog/
>>>>> crossing-streams-joins-apache-kafka/
>>>>>
>>>>> ----
>>>>>
>>>>> I don't really know what to do with this response. I have been aware of
>>>>> some "slight" discrepancy that might occur in edge cases with
>>>>> KStream-KTable joins for some time now, but what I'm seeing is not a
>>> slight
>>>>> discrepancy but very different results.
>>>>>
>>>>> I looked at the JIRA Matthias linked
>>>>> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data
>>> has
>>>>> no late arriving records. I don't know about the empty buffers. I have
>>> read
>>>>> the blog post he linked several times already.
>>>>>
>>>>> Can someone please suggest how I may obviate this problem? For example
>>>>>
>>>>>    - Would it make sense for me to try launching the topology with
>>> fewer
>>>>>    threads during the reprocess?
>>>>>    - Would it make sense for launch the topology with fewer input
>>> tasks?
>>>>>    - Would it make sense to increase size of the stream buffer?
>>>>>
>>>>> I am at a total loss at this point. I cannot believe that there is
>>> nothing
>>>>> I can do to replay this data and perform the migration I am trying to
>>>>> perform, in order to release a next version of my application. Am I
>>> totally
>>>>> screwed?
>>>>>
>>>>>
>>>>> Thank you,
>>>>> Dmitry
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
> 


Re: Kafka Streams topology does not replay correctly

Posted by Dmitry Minkovsky <dm...@gmail.com>.
I meant “Thanks, yes I will try replacing...”

вт, 16 янв. 2018 г. в 22:12, Dmitry Minkovsky <dm...@gmail.com>:

> Thanks, yes try replacing the KStream-KTable joins with
> KStream#transform()s and a store. Not sure why you mean I’d need to buffer
> multiple records. The KStream has incoming events, and #transform() will
> let me mount the store and use it how I please. Within an application
> instance, any other KStream#transform()s using the same store will see the
> same data in real time.
>
> Now suppose I have three topics, each with events like this, each on their
> own KStream:
>
> T1 join
> T2 settings-confirm
> T3 settings-update
>
> Will the topology call the join transform before the settings-confirm
> transform before the settings-update transform?
>
>
>
> вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <ma...@confluent.io>:
>
>> You have more flexibility of course and thus can get better results. But
>> your code must be able to buffer multiple records from the KTable and
>> KStream input and also store the corresponding timestamps to perform the
>> join correctly. It's not trivial but also also not rocket-science.
>>
>> If we need stronger guarantees, it's the best way to follow though atm,
>> until we have addressed those issues. Planned for 1.2.0 release.
>>
>> -Matthias
>>
>>
>> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
>> > Right now I am thinking of re-writing anything that has these
>> problematic
>> > KStream/KTable joins as KStream#transform() wherein the state store is
>> > manually used. Does that makes sense as an option for me?
>> >
>> > -Dmitry
>> >
>> > On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dminkovsky@gmail.com
>> >
>> > wrote:
>> >
>> >> Earlier today I posted this question to SO
>> >> <
>> https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly
>> >
>> >> :
>> >>
>> >>> I have a topology that looks like this:
>> >>
>> >>     KTable<ByteString, User> users = topology.table(USERS,
>> >> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>> >>
>> >>     KStream<ByteString, JoinRequest> joinRequests =
>> >> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
>> >> joinRequestSerde))
>> >>         .mapValues(entityTopologyProcessor::userNew)
>> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>
>> >>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
>> >> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
>> >>         .join(users, entityTopologyProcessor::userSettingsConfirm,
>> >> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
>> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>
>> >>     topology.stream(SETTINGS_UPDATE_REQUESTS,
>> >> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
>> >>         .join(users, entityTopologyProcessor::userSettingsUpdate,
>> >> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
>> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>
>> >>> At runtime this topology works fine. Users are created with join
>> >> requests. They confirm their settings with settings confirm requests.
>> They
>> >> update their settings with settings update requests.
>> >>>
>> >>> However, reprocessing this topology does not produce the original
>> >> results. Specifically, the settings update joiner does not see the user
>> >> that resulted from the settings confirm joiner, even though in terms of
>> >> timestamps, many seconds elapse from the time the user is created, to
>> the
>> >> time the user is confirmed to the time the user updates their settings.
>> >>>
>> >>> I'm at a loss. I've tried turning off caching/logging on the user
>> table.
>> >> No idea what to do to make this reprocess properly.
>> >>
>> >> ----
>> >>
>> >> The response by Matthias, also on SO:
>> >>
>> >>> A KStream-KTable join is not 100% deterministic (and might never
>> become
>> >> 100% deterministic). We are aware of the problem and discuss
>> solutions, to
>> >> at least mitigate the issue.
>> >>>
>> >>> One problem is, that if a Consumer fetches from the brokers, we cannot
>> >> control easily for which topics and/or partitions the broker returns
>> data.
>> >> And depending on the order in which we receive data from the broker,
>> the
>> >> result might slightly differ.
>> >>>
>> >>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
>> >>>
>> >>> This blog post might help, too: https://www.confluent.io/blog/
>> >> crossing-streams-joins-apache-kafka/
>> >>
>> >> ----
>> >>
>> >> I don't really know what to do with this response. I have been aware of
>> >> some "slight" discrepancy that might occur in edge cases with
>> >> KStream-KTable joins for some time now, but what I'm seeing is not a
>> slight
>> >> discrepancy but very different results.
>> >>
>> >> I looked at the JIRA Matthias linked
>> >> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data
>> has
>> >> no late arriving records. I don't know about the empty buffers. I have
>> read
>> >> the blog post he linked several times already.
>> >>
>> >> Can someone please suggest how I may obviate this problem? For example
>> >>
>> >>    - Would it make sense for me to try launching the topology with
>> fewer
>> >>    threads during the reprocess?
>> >>    - Would it make sense for launch the topology with fewer input
>> tasks?
>> >>    - Would it make sense to increase size of the stream buffer?
>> >>
>> >> I am at a total loss at this point. I cannot believe that there is
>> nothing
>> >> I can do to replay this data and perform the migration I am trying to
>> >> perform, in order to release a next version of my application. Am I
>> totally
>> >> screwed?
>> >>
>> >>
>> >> Thank you,
>> >> Dmitry
>> >>
>> >>
>> >>
>> >>
>> >
>>
>>

Re: Kafka Streams topology does not replay correctly

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Thanks, yes try replacing the KStream-KTable joins with
KStream#transform()s and a store. Not sure why you mean I’d need to buffer
multiple records. The KStream has incoming events, and #transform() will
let me mount the store and use it how I please. Within an application
instance, any other KStream#transform()s using the same store will see the
same data in real time.

Now suppose I have three topics, each with events like this, each on their
own KStream:

T1 join
T2 settings-confirm
T3 settings-update

Will the topology call the join transform before the settings-confirm
transform before the settings-update transform?



вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <ma...@confluent.io>:

> You have more flexibility of course and thus can get better results. But
> your code must be able to buffer multiple records from the KTable and
> KStream input and also store the corresponding timestamps to perform the
> join correctly. It's not trivial but also also not rocket-science.
>
> If we need stronger guarantees, it's the best way to follow though atm,
> until we have addressed those issues. Planned for 1.2.0 release.
>
> -Matthias
>
>
> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
> > Right now I am thinking of re-writing anything that has these problematic
> > KStream/KTable joins as KStream#transform() wherein the state store is
> > manually used. Does that makes sense as an option for me?
> >
> > -Dmitry
> >
> > On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dm...@gmail.com>
> > wrote:
> >
> >> Earlier today I posted this question to SO
> >> <
> https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly
> >
> >> :
> >>
> >>> I have a topology that looks like this:
> >>
> >>     KTable<ByteString, User> users = topology.table(USERS,
> >> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
> >>
> >>     KStream<ByteString, JoinRequest> joinRequests =
> >> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
> >> joinRequestSerde))
> >>         .mapValues(entityTopologyProcessor::userNew)
> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>
> >>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
> >> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
> >>         .join(users, entityTopologyProcessor::userSettingsConfirm,
> >> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>
> >>     topology.stream(SETTINGS_UPDATE_REQUESTS,
> >> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
> >>         .join(users, entityTopologyProcessor::userSettingsUpdate,
> >> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>
> >>> At runtime this topology works fine. Users are created with join
> >> requests. They confirm their settings with settings confirm requests.
> They
> >> update their settings with settings update requests.
> >>>
> >>> However, reprocessing this topology does not produce the original
> >> results. Specifically, the settings update joiner does not see the user
> >> that resulted from the settings confirm joiner, even though in terms of
> >> timestamps, many seconds elapse from the time the user is created, to
> the
> >> time the user is confirmed to the time the user updates their settings.
> >>>
> >>> I'm at a loss. I've tried turning off caching/logging on the user
> table.
> >> No idea what to do to make this reprocess properly.
> >>
> >> ----
> >>
> >> The response by Matthias, also on SO:
> >>
> >>> A KStream-KTable join is not 100% deterministic (and might never become
> >> 100% deterministic). We are aware of the problem and discuss solutions,
> to
> >> at least mitigate the issue.
> >>>
> >>> One problem is, that if a Consumer fetches from the brokers, we cannot
> >> control easily for which topics and/or partitions the broker returns
> data.
> >> And depending on the order in which we receive data from the broker, the
> >> result might slightly differ.
> >>>
> >>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
> >>>
> >>> This blog post might help, too: https://www.confluent.io/blog/
> >> crossing-streams-joins-apache-kafka/
> >>
> >> ----
> >>
> >> I don't really know what to do with this response. I have been aware of
> >> some "slight" discrepancy that might occur in edge cases with
> >> KStream-KTable joins for some time now, but what I'm seeing is not a
> slight
> >> discrepancy but very different results.
> >>
> >> I looked at the JIRA Matthias linked
> >> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data
> has
> >> no late arriving records. I don't know about the empty buffers. I have
> read
> >> the blog post he linked several times already.
> >>
> >> Can someone please suggest how I may obviate this problem? For example
> >>
> >>    - Would it make sense for me to try launching the topology with fewer
> >>    threads during the reprocess?
> >>    - Would it make sense for launch the topology with fewer input tasks?
> >>    - Would it make sense to increase size of the stream buffer?
> >>
> >> I am at a total loss at this point. I cannot believe that there is
> nothing
> >> I can do to replay this data and perform the migration I am trying to
> >> perform, in order to release a next version of my application. Am I
> totally
> >> screwed?
> >>
> >>
> >> Thank you,
> >> Dmitry
> >>
> >>
> >>
> >>
> >
>
>

Re: Kafka Streams topology does not replay correctly

Posted by "Matthias J. Sax" <ma...@confluent.io>.
You have more flexibility of course and thus can get better results. But
your code must be able to buffer multiple records from the KTable and
KStream input and also store the corresponding timestamps to perform the
join correctly. It's not trivial but also also not rocket-science.

If we need stronger guarantees, it's the best way to follow though atm,
until we have addressed those issues. Planned for 1.2.0 release.

-Matthias


On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
> Right now I am thinking of re-writing anything that has these problematic
> KStream/KTable joins as KStream#transform() wherein the state store is
> manually used. Does that makes sense as an option for me?
> 
> -Dmitry
> 
> On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dm...@gmail.com>
> wrote:
> 
>> Earlier today I posted this question to SO
>> <https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly>
>> :
>>
>>> I have a topology that looks like this:
>>
>>     KTable<ByteString, User> users = topology.table(USERS,
>> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>>
>>     KStream<ByteString, JoinRequest> joinRequests =
>> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
>> joinRequestSerde))
>>         .mapValues(entityTopologyProcessor::userNew)
>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>
>>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
>> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
>>         .join(users, entityTopologyProcessor::userSettingsConfirm,
>> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>
>>     topology.stream(SETTINGS_UPDATE_REQUESTS,
>> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
>>         .join(users, entityTopologyProcessor::userSettingsUpdate,
>> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>
>>> At runtime this topology works fine. Users are created with join
>> requests. They confirm their settings with settings confirm requests. They
>> update their settings with settings update requests.
>>>
>>> However, reprocessing this topology does not produce the original
>> results. Specifically, the settings update joiner does not see the user
>> that resulted from the settings confirm joiner, even though in terms of
>> timestamps, many seconds elapse from the time the user is created, to the
>> time the user is confirmed to the time the user updates their settings.
>>>
>>> I'm at a loss. I've tried turning off caching/logging on the user table.
>> No idea what to do to make this reprocess properly.
>>
>> ----
>>
>> The response by Matthias, also on SO:
>>
>>> A KStream-KTable join is not 100% deterministic (and might never become
>> 100% deterministic). We are aware of the problem and discuss solutions, to
>> at least mitigate the issue.
>>>
>>> One problem is, that if a Consumer fetches from the brokers, we cannot
>> control easily for which topics and/or partitions the broker returns data.
>> And depending on the order in which we receive data from the broker, the
>> result might slightly differ.
>>>
>>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
>>>
>>> This blog post might help, too: https://www.confluent.io/blog/
>> crossing-streams-joins-apache-kafka/
>>
>> ----
>>
>> I don't really know what to do with this response. I have been aware of
>> some "slight" discrepancy that might occur in edge cases with
>> KStream-KTable joins for some time now, but what I'm seeing is not a slight
>> discrepancy but very different results.
>>
>> I looked at the JIRA Matthias linked
>> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data has
>> no late arriving records. I don't know about the empty buffers. I have read
>> the blog post he linked several times already.
>>
>> Can someone please suggest how I may obviate this problem? For example
>>
>>    - Would it make sense for me to try launching the topology with fewer
>>    threads during the reprocess?
>>    - Would it make sense for launch the topology with fewer input tasks?
>>    - Would it make sense to increase size of the stream buffer?
>>
>> I am at a total loss at this point. I cannot believe that there is nothing
>> I can do to replay this data and perform the migration I am trying to
>> perform, in order to release a next version of my application. Am I totally
>> screwed?
>>
>>
>> Thank you,
>> Dmitry
>>
>>
>>
>>
> 


Re: Kafka Streams topology does not replay correctly

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Right now I am thinking of re-writing anything that has these problematic
KStream/KTable joins as KStream#transform() wherein the state store is
manually used. Does that makes sense as an option for me?

-Dmitry

On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dm...@gmail.com>
wrote:

> Earlier today I posted this question to SO
> <https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly>
> :
>
> > I have a topology that looks like this:
>
>     KTable<ByteString, User> users = topology.table(USERS,
> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>
>     KStream<ByteString, JoinRequest> joinRequests =
> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
> joinRequestSerde))
>         .mapValues(entityTopologyProcessor::userNew)
>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>
>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
>         .join(users, entityTopologyProcessor::userSettingsConfirm,
> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>
>     topology.stream(SETTINGS_UPDATE_REQUESTS,
> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
>         .join(users, entityTopologyProcessor::userSettingsUpdate,
> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>
> > At runtime this topology works fine. Users are created with join
> requests. They confirm their settings with settings confirm requests. They
> update their settings with settings update requests.
> >
> > However, reprocessing this topology does not produce the original
> results. Specifically, the settings update joiner does not see the user
> that resulted from the settings confirm joiner, even though in terms of
> timestamps, many seconds elapse from the time the user is created, to the
> time the user is confirmed to the time the user updates their settings.
> >
> > I'm at a loss. I've tried turning off caching/logging on the user table.
> No idea what to do to make this reprocess properly.
>
> ----
>
> The response by Matthias, also on SO:
>
> > A KStream-KTable join is not 100% deterministic (and might never become
> 100% deterministic). We are aware of the problem and discuss solutions, to
> at least mitigate the issue.
> >
> > One problem is, that if a Consumer fetches from the brokers, we cannot
> control easily for which topics and/or partitions the broker returns data.
> And depending on the order in which we receive data from the broker, the
> result might slightly differ.
> >
> > One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
> >
> > This blog post might help, too: https://www.confluent.io/blog/
> crossing-streams-joins-apache-kafka/
>
> ----
>
> I don't really know what to do with this response. I have been aware of
> some "slight" discrepancy that might occur in edge cases with
> KStream-KTable joins for some time now, but what I'm seeing is not a slight
> discrepancy but very different results.
>
> I looked at the JIRA Matthias linked
> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data has
> no late arriving records. I don't know about the empty buffers. I have read
> the blog post he linked several times already.
>
> Can someone please suggest how I may obviate this problem? For example
>
>    - Would it make sense for me to try launching the topology with fewer
>    threads during the reprocess?
>    - Would it make sense for launch the topology with fewer input tasks?
>    - Would it make sense to increase size of the stream buffer?
>
> I am at a total loss at this point. I cannot believe that there is nothing
> I can do to replay this data and perform the migration I am trying to
> perform, in order to release a next version of my application. Am I totally
> screwed?
>
>
> Thank you,
> Dmitry
>
>
>
>