You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Vasily Sulatskov <va...@sulatskov.net> on 2018/07/12 16:50:48 UTC

Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Hi,

I am doing some experiments with kafka-streams KGroupedTable
aggregation, and admittedly I am not wiping data properly on each
restart, partially because I also wonder what would happen if you
change a streams topology without doing a proper reset.

I've noticed that from time to time, kafka-streams
KGroupedTable.reduce() can call subtractor function with null
aggregator value, and if you try to work around that, by interpreting
null aggregator value as zero for numeric value you get incorrect
aggregation result.

I do understand that the proper way of handling this is to do a reset
on topology changes, but I'd like to understand if there's any
legitmate case when kafka-streams can call an adder or a substractor
with null aggregator value, and should I plan for this, or should I
interpret this as an invalid state, and terminate the application, and
do a proper reset?

Also, I can't seem to find a guide which explains when application
reset is necessary. Intuitively it seems that it should be done every
time a topology changes. Any other cases?

I tried to debug where the null value comes from and it seems that
KTableReduce.process() is getting called with Change<V> value with
newValue == null, and some non-null oldValue. Which leads to and to
subtractor being called with null aggregator value. I wonder how it is
possible to have an old value for a key without a new value (does it
happen because of the auto commit interval?).

I've also noticed that it's possible for an input value from a topic
to bypass aggregation function entirely and be directly transmitted to
the output in certain cases: oldAgg is null, newValue is not null and
oldValue is null - in that case newValue will be transmitted directly
to the output. I suppose it's the correct behaviour, but feels a bit
weird nonetheless. And I've actually been able to observe this
behaviour in practice. I suppose it's also caused by this happening
right before a commit happens, and the message is sent to a changelog
topic.

Please can someone with more knowledge shed some light on these issues?

-- 
Best regards,
Vasily Sulatskov

Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by Vasily Sulatskov <va...@sulatskov.net>.
Thank you everyone for your explanations, that's been most enlightening.
On Wed, Jul 18, 2018 at 2:28 AM Matthias J. Sax <ma...@confluent.io> wrote:
>
> I see -- sorry for miss-understanding initially.
>
> I agree that it would be possible to detect. Feel free to file a Jira
> for this improvement and maybe pick it up by yourself :)
>
>
> -Matthias
>
> On 7/17/18 3:01 PM, Vasily Sulatskov wrote:
> > Hi,
> >
> > I do understand that in a general case it's not possible to guarantee
> > that newValue and oldValue parts of a Change message arrive to the
> > same partitions, and I guess that's not really in the plans, but if I
> > correctly understand how it works, it should be possible to detect if
> > both newValue and oldValue go to the same partition and keep them
> > together, thus improving kafka-streams consistency guarantees. Right?
> >
> > For example right now I have such a usecase that when I perform
> > groupBy on a table, my new keys are computed purely from old keys, and
> > not from the value. And handling of such cases (not a general case)
> > can be improved.
> > On Tue, Jul 17, 2018 at 1:48 AM Matthias J. Sax <ma...@confluent.io> wrote:
> >>
> >> It is not possible to use a single message, because both messages may go
> >> to different partitions and may be processed by different applications
> >> instances.
> >>
> >> Note, that the overall KTable state is sharded. Updating a single
> >> upstream shard, might required to update two different downstream shards.
> >>
> >>
> >> -Matthias
> >>
> >> On 7/16/18 2:50 PM, Vasily Sulatskov wrote:
> >>> Hi,
> >>>
> >>> It seems that it wouldn't be that difficult to address: just don't
> >>> break Change(newVal, oldVal) into Change(newVal, null) /
> >>> Change(oldVal, null) and update aggregator value in one .process()
> >>> call.
> >>>
> >>> Would this change make sense?
> >>> On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax <ma...@confluent.io> wrote:
> >>>>
> >>>> Vasily,
> >>>>
> >>>> yes, it can happen. As you noticed, both messages might be processed on
> >>>> different machines. Thus, Kafka Streams provides 'eventual consistency'
> >>>> guarantees.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
> >>>>> Hi John,
> >>>>>
> >>>>> Thanks a lot for you explanation. It does make much more sense now.
> >>>>>
> >>>>> The Jira issue I think is pretty well explained (with a reference to
> >>>>> this thread). And I've lest my 2 cents in the pull request.
> >>>>>
> >>>>> You are right I didn't notice that repartition topic contains the same
> >>>>> message effectively twice, and 0/1 bytes are non-visible, so when I
> >>>>> used kafka-console-consumer I didn't notice that. So I have a quick
> >>>>> suggestion here, wouldn't it make sense to change 0 and 1 bytes to
> >>>>> something that has visible corresponding ascii characters, say + and
> >>>>> -, as these messages are effectively commands to reducer to execute
> >>>>> either an addition or subtraction?
> >>>>>
> >>>>> On a more serious, side, can you please explain temporal aspects of
> >>>>> how change messages are handled? More specifically, is it guaranteed
> >>>>> that both Change(newValue, null) and Change(null, oldValue) are
> >>>>> handled before a new aggregated value is comitted to an output topic?
> >>>>> Change(newValue, null) and Change(null, oldValue) are delivered as two
> >>>>> separate messages via a kafka topic, and when they are read from a
> >>>>> topic (possibly on a different machine where a commit interval is
> >>>>> asynchronous to a machine that's put these changes into a topic) can
> >>>>> it happen so a Change(newValue, null) is processed by a
> >>>>> KTableReduceProcessor, the value of the aggregator is updated, and
> >>>>> committed to the changelog topic, and a Change(null, oldValue) is
> >>>>> processed only in the next commit interval? If I am understand this
> >>>>> correctly that would mean that in an aggregated table an incorrect
> >>>>> aggregated value will be observed briefly, before being eventually
> >>>>> corrected.
> >>>>>
> >>>>> Can that happen? Or I can't see something that would make it impossible?
> >>>>> On Fri, Jul 13, 2018 at 8:05 PM John Roesler <jo...@confluent.io> wrote:
> >>>>>>
> >>>>>> Hi Vasily,
> >>>>>>
> >>>>>> I'm glad you're making me look at this; it's good homework for me!
> >>>>>>
> >>>>>> This is very non-obvious, but here's what happens:
> >>>>>>
> >>>>>> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits
> >>>>>> new/old Change pairs as the value.
> >>>>>>
> >>>>>> Next is the Select (aka GroupBy). In the DSL code, this is the
> >>>>>> KTableRepartitionMap (we call it a repartition when you select a new key,
> >>>>>> since the new keys may belong to different partitions).
> >>>>>> KTableRepartitionMap is a processor that does two things:
> >>>>>> 1. it maps K => K1 (new keys) and V => V1 (new values)
> >>>>>> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
> >>>>>> null)]
> >>>>>> In other words, it turns each Change event into two events: a retraction
> >>>>>> and an update
> >>>>>>
> >>>>>> Next comes the reduce operation. In building the processor node for this
> >>>>>> operation, we create the sink, repartition topic, and source, followed by
> >>>>>> the actual Reduce node. So if you want to look at how the changes get
> >>>>>> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
> >>>>>> You'll see that sink and source a ChangedSerializer and ChangedDeserializer.
> >>>>>>
> >>>>>> By looking into those implementations, I found that they depend on each
> >>>>>> Change containing just one of new OR old. They serialize the underlying
> >>>>>> value using the serde you provide, along with a single byte that signifies
> >>>>>> if the serialized value is the new or old value, which the deserializer
> >>>>>> uses on the receiving end to turn it back into a Change(new, null) or
> >>>>>> Change(null, old) as appropriate. This is why the repartition topic looks
> >>>>>> like it's just the raw data. It basically is, except for the magic byte.
> >>>>>>
> >>>>>> Does that make sense?
> >>>>>>
> >>>>>> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
> >>>>>> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
> >>>>>> leaving any feedback you have?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> -John
> >>>>>>
> >>>>>> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <va...@sulatskov.net>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi John,
> >>>>>>>
> >>>>>>> Thanks for your explanation.
> >>>>>>>
> >>>>>>> I have an answer to the practical question, i.e. a null aggregator
> >>>>>>> value should be interpreted as a fatal application error.
> >>>>>>>
> >>>>>>> On the other hand, looking at the app topology, I see that a message
> >>>>>>> from KSTREAM-REDUCE-0000000002 / "table" goes goes to
> >>>>>>> KTABLE-SELECT-0000000006 which in turn forwards data to
> >>>>>>> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
> >>>>>>> this point I assume that data goes back to kafka into a *-repartition
> >>>>>>> topic, after that the message is read from kafka by
> >>>>>>> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
> >>>>>>> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
> >>>>>>> [aggregated-table]), where the actual aggregation takes place. What I
> >>>>>>> don't get is where this Change value comes from, I mean if it's been
> >>>>>>> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
> >>>>>>> message goes through kafka where it gets serialized, and looking at
> >>>>>>> kafka "repartition" topic, it contains regular values, not a pair of
> >>>>>>> old/new.
> >>>>>>>
> >>>>>>> As far as I understand, Change is a purely in-memory representation of
> >>>>>>> the state for a particular key, and at no point it's serialized back
> >>>>>>> to kafka, yet somehow this Change values makes it to reducer. I feel
> >>>>>>> like I am missing something here. Could you please clarify this?
> >>>>>>>
> >>>>>>> Can you please point me to a place in kafka-streams sources where a
> >>>>>>> Change of newValue/oldValue is produced, so I could take a look? I
> >>>>>>> found KTableReduce implementation, but can't find who makes these
> >>>>>>> Change value.
> >>>>>>> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <jo...@confluent.io> wrote:
> >>>>>>>>
> >>>>>>>> Hi again Vasily,
> >>>>>>>>
> >>>>>>>> Ok, it looks to me like this behavior is the result of the un-clean
> >>>>>>>> topology change.
> >>>>>>>>
> >>>>>>>> Just in case you're interested, here's what I think happened.
> >>>>>>>>
> >>>>>>>> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table"
> >>>>>>> )
> >>>>>>>> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
> >>>>>>>> forwarding both the old and new value that we are able to maintain
> >>>>>>>> aggregates using the subtractor/adder pairs)
> >>>>>>>>
> >>>>>>>> 2. In the full topology, these old/new pairs go through some
> >>>>>>>> transformations, but still in some form eventually make their way down to
> >>>>>>>> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
> >>>>>>>>
> >>>>>>>> 3. The reduce processor logic looks like this:
> >>>>>>>> final V oldAgg = store.get(key);
> >>>>>>>> V newAgg = oldAgg;
> >>>>>>>>
> >>>>>>>> // first try to add the new value
> >>>>>>>> if (value.newValue != null) {
> >>>>>>>>     if (newAgg == null) {
> >>>>>>>>         newAgg = value.newValue;
> >>>>>>>>     } else {
> >>>>>>>>         newAgg = addReducer.apply(newAgg, value.newValue);
> >>>>>>>>     }
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> // then try to remove the old value
> >>>>>>>> if (value.oldValue != null) {
> >>>>>>>>     // Here's where the assumption breaks down...
> >>>>>>>>     newAgg = removeReducer.apply(newAgg, value.oldValue);
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> 4. Here's what I think happened. This processor saw an event like
> >>>>>>>> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block,
> >>>>>>> and
> >>>>>>>> (since "oldValue != null") would go into the second block. I think that
> >>>>>>> in
> >>>>>>>> the normal case we can rely on the invariant that any value we get as an
> >>>>>>>> "oldValue" is one that we've previously seen ( as "newValue" ).
> >>>>>>>> Consequently, we should be able to assume that if we get a non-null
> >>>>>>>> "oldValue", "newAgg" will also not be null (because we would have written
> >>>>>>>> it to the store back when we saw it as "newValue" and then retrieved it
> >>>>>>>> just now as "newAgg = oldAgg").
> >>>>>>>>
> >>>>>>>> However, if subtopology2, along with KTABLE-SELECT-0000000006
> >>>>>>>> and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
> >>>>>>>> "table") has already emitted some values, then we might in fact receive
> >>>>>>> an
> >>>>>>>> event with some "oldValue" that we have in fact never seen before
> >>>>>>> (because (
> >>>>>>>> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when
> >>>>>>> it
> >>>>>>>> was first emitted as a "newValue").
> >>>>>>>>
> >>>>>>>> This would violate our assumption, and we would unintentionally send a
> >>>>>>>> "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
> >>>>>>>> If you want to double-check my reasoning, you should be able to do so in
> >>>>>>>> the debugger with a breakpoint in KTableReduce.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> tl;dr: Supposing you reset the app when the topology changes, I think
> >>>>>>> that
> >>>>>>>> you should be able to rely on non-null aggregates being passed in to
> >>>>>>> *both*
> >>>>>>>> the adder and subtractor in a reduce.
> >>>>>>>>
> >>>>>>>> I would be in favor, as you suggested, of adding an explicit check and
> >>>>>>>> throwing an exception if the aggregate is ever null at those points. This
> >>>>>>>> would actually help us detect if the topology has changed unexpectedly
> >>>>>>> and
> >>>>>>>> shut down, hopefully before any damage is done. I'll send a PR and see
> >>>>>>> what
> >>>>>>>> everyone thinks.
> >>>>>>>>
> >>>>>>>> Does this all seem like it adds up to you?
> >>>>>>>> -John
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <va...@sulatskov.net>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi John,
> >>>>>>>>>
> >>>>>>>>> Thanks for your reply. I am not sure if this behavior I've observed is
> >>>>>>>>> a bug or not, as I've not been resetting my application properly. On
> >>>>>>>>> the other hand if the subtractor or adder in the reduce operation are
> >>>>>>>>> never supposed to be called with null aggregator value, perhaps it
> >>>>>>>>> would make sense to put a null check in the table reduce
> >>>>>>>>> implementation to detect an application entering an invalid state. A
> >>>>>>>>> bit like a check for topics having the same number of partitions when
> >>>>>>>>> doing a join.
> >>>>>>>>>
> >>>>>>>>> Here's some information about my tests. Hope that can be useful:
> >>>>>>>>>
> >>>>>>>>> Topology at start:
> >>>>>>>>>
> >>>>>>>>> 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
> >>>>>>>>>    Sub-topology: 0
> >>>>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >>>>>>>>>       --> KSTREAM-MAP-0000000001
> >>>>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >>>>>>>>>       --> KSTREAM-FILTER-0000000004
> >>>>>>>>>       <-- KSTREAM-SOURCE-0000000000
> >>>>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000003
> >>>>>>>>>       <-- KSTREAM-MAP-0000000001
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >>>>>>>>>       <-- KSTREAM-FILTER-0000000004
> >>>>>>>>>
> >>>>>>>>>   Sub-topology: 1
> >>>>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >>>>>>>>>       --> KSTREAM-REDUCE-0000000002
> >>>>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >>>>>>>>>       --> KTABLE-TOSTREAM-0000000006
> >>>>>>>>>       <-- KSTREAM-SOURCE-0000000005
> >>>>>>>>>     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000007
> >>>>>>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
> >>>>>>>>>       <-- KTABLE-TOSTREAM-0000000006
> >>>>>>>>>
> >>>>>>>>> This topology just takes data from the source topic "slope" which
> >>>>>>>>> produces messages like this:
> >>>>>>>>>
> >>>>>>>>> key1
> >>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>>>>>> key3
> >>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>>>>>> key2
> >>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>>>>>> key3
> >>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>>>>>> key1
> >>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>>>>>> key2
> >>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>>>>>>
> >>>>>>>>> Every second, there are 3 new messages arrive from "slope" topic for
> >>>>>>>>> keys key1, key2 and key3, with constantly increasing value.
> >>>>>>>>> Data is transformed so that the original key is also tracked in the
> >>>>>>>>> message value, grouped by key, and windowed with a custom window, and
> >>>>>>>>> reduced with a dummy reduce operation to make a KTable.
> >>>>>>>>> KTable is converted back to a stream, and sent to a topic (just for
> >>>>>>>>> debugging purposes).
> >>>>>>>>>
> >>>>>>>>> Here's the source (it's kafka-streams-scala for the most part). Also
> >>>>>>>>> please ignore silly classes, it's obviously a test:
> >>>>>>>>>
> >>>>>>>>>     val slopeTable = builder
> >>>>>>>>>       .stream[String, TimedValue]("slope")
> >>>>>>>>>       .map(
> >>>>>>>>>         (key, value) =>
> >>>>>>>>>           (
> >>>>>>>>>             StringWrapper(key),
> >>>>>>>>>             TimedValueWithKey(value = value.value, timestamp =
> >>>>>>>>> value.timestamp, key = key)
> >>>>>>>>>         )
> >>>>>>>>>       )
> >>>>>>>>>       .groupByKey
> >>>>>>>>>       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
> >>>>>>>>>       .reduceMat((aggValue, newValue) => newValue, "table")
> >>>>>>>>>
> >>>>>>>>>     slopeTable.toStream
> >>>>>>>>>       .to("slope-table")
> >>>>>>>>>
> >>>>>>>>> Topology after change without a proper reset:
> >>>>>>>>>
> >>>>>>>>> 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
> >>>>>>>>>    Sub-topology: 0
> >>>>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >>>>>>>>>       --> KSTREAM-MAP-0000000001
> >>>>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >>>>>>>>>       --> KSTREAM-FILTER-0000000004
> >>>>>>>>>       <-- KSTREAM-SOURCE-0000000000
> >>>>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000003
> >>>>>>>>>       <-- KSTREAM-MAP-0000000001
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >>>>>>>>>       <-- KSTREAM-FILTER-0000000004
> >>>>>>>>>
> >>>>>>>>>   Sub-topology: 1
> >>>>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >>>>>>>>>       --> KSTREAM-REDUCE-0000000002
> >>>>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >>>>>>>>>       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
> >>>>>>>>>       <-- KSTREAM-SOURCE-0000000005
> >>>>>>>>>     Processor: KTABLE-SELECT-0000000006 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000007
> >>>>>>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>>>>>     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000013
> >>>>>>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
> >>>>>>>>>       <-- KTABLE-SELECT-0000000006
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
> >>>>>>>>>       <-- KTABLE-TOSTREAM-0000000012
> >>>>>>>>>
> >>>>>>>>>   Sub-topology: 2
> >>>>>>>>>     Source: KSTREAM-SOURCE-0000000008 (topics:
> >>>>>>>>> [aggregated-table-repartition])
> >>>>>>>>>       --> KTABLE-REDUCE-0000000009
> >>>>>>>>>     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
> >>>>>>>>>       --> KTABLE-TOSTREAM-0000000010
> >>>>>>>>>       <-- KSTREAM-SOURCE-0000000008
> >>>>>>>>>     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000011
> >>>>>>>>>       <-- KTABLE-REDUCE-0000000009
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
> >>>>>>>>>       <-- KTABLE-TOSTREAM-0000000010
> >>>>>>>>>
> >>>>>>>>> Here's the source of the sub-topology that does table aggregation:
> >>>>>>>>>
> >>>>>>>>>     slopeTable
> >>>>>>>>>       .groupBy(
> >>>>>>>>>         (key, value) => (new Windowed(StringWrapper("dummykey"),
> >>>>>>>>> key.window()), value)
> >>>>>>>>>       )
> >>>>>>>>>       .reduceMat(adder = (aggValue, newValue) => {
> >>>>>>>>>         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
> >>>>>>>>>         val agg = Option(aggValue)
> >>>>>>>>>         TimedValueWithKey(
> >>>>>>>>>           value = agg.map(_.value).getOrElse(0) + newValue.value,
> >>>>>>>>>           timestamp =
> >>>>>>>>>
> >>>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> >>>>>>>>> newValue.timestamp),
> >>>>>>>>>           key = "reduced"
> >>>>>>>>>         )
> >>>>>>>>>       }, subtractor = (aggValue, newValue) => {
> >>>>>>>>>         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
> >>>>>>>>>         val agg = Option(aggValue)
> >>>>>>>>>         TimedValueWithKey(
> >>>>>>>>>           value = agg.map(_.value).getOrElse(0) - newValue.value,
> >>>>>>>>>           timestamp =
> >>>>>>>>>
> >>>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> >>>>>>>>> newValue.timestamp),
> >>>>>>>>>           key = "reduced"
> >>>>>>>>>         )
> >>>>>>>>>       }, "aggregated-table")
> >>>>>>>>>       .toStream
> >>>>>>>>>       .to("slope-aggregated-table")
> >>>>>>>>>
> >>>>>>>>> I log all calls to adder and subtractor, so I am able to see what's
> >>>>>>>>> going on there, as well as I track the original keys of the aggregated
> >>>>>>>>> values and their timestamps, so it's relatively easy to see how the
> >>>>>>>>> data goes through this topology
> >>>>>>>>>
> >>>>>>>>> In order to reproduce this behavior I need to:
> >>>>>>>>> 1. Start a full topology (with table aggregation)
> >>>>>>>>> 2. Start without table aggregation (no app reset)
> >>>>>>>>> 3. Start with table aggregation (no app reset)
> >>>>>>>>>
> >>>>>>>>> Bellow is an interpretation of the adder/subtractor logs for a given
> >>>>>>>>> key/window in the chronological order
> >>>>>>>>>
> >>>>>>>>> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> >>>>>>>>> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> >>>>>>>>> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
> >>>>>>>>> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
> >>>>>>>>> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
> >>>>>>>>> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
> >>>>>>>>>
> >>>>>>>>> And in the end the last value that's materialized for that window
> >>>>>>>>> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
> >>>>>>>>> for a single key between some point in the middle of the window and at
> >>>>>>>>> the end of the window, times 3. As opposed to the expected value of
> >>>>>>>>> 751 * 3 = 2253 (sum of last values in a time window for all keys being
> >>>>>>>>> aggregated).
> >>>>>>>>>
> >>>>>>>>> It's clear to me that I should do an application reset, but I also
> >>>>>>>>> would like to understand, should I expect adder/subtractor being
> >>>>>>>>> called with null aggValue, or is it a clear sign that something went
> >>>>>>>>> horribly wrong?
> >>>>>>>>>
> >>>>>>>>> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <jo...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi Vasily,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the email.
> >>>>>>>>>>
> >>>>>>>>>> To answer your question: you should reset the application basically
> >>>>>>> any
> >>>>>>>>>> time you change the topology. Some transitions are safe, but others
> >>>>>>> will
> >>>>>>>>>> result in data loss or corruption. Rather than try to reason about
> >>>>>>> which
> >>>>>>>>> is
> >>>>>>>>>> which, it's much safer just to either reset the app or not change it
> >>>>>>> (if
> >>>>>>>>> it
> >>>>>>>>>> has important state).
> >>>>>>>>>>
> >>>>>>>>>> Beyond changes that you make to the topology, we spend a lot of
> >>>>>>> effort to
> >>>>>>>>>> try and make sure that different versions of Streams will produce the
> >>>>>>>>> same
> >>>>>>>>>> topology, so unless the release notes say otherwise, you should be
> >>>>>>> able
> >>>>>>>>> to
> >>>>>>>>>> upgrade without a reset.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I can't say right now whether those wacky behaviors are bugs or the
> >>>>>>>>> result
> >>>>>>>>>> of changing the topology without a reset. Or if they are correct but
> >>>>>>>>>> surprising behavior somehow. I'll look into it tomorrow. Do feel
> >>>>>>> free to
> >>>>>>>>>> open a Jira ticket if you think you have found a bug, especially if
> >>>>>>> you
> >>>>>>>>> can
> >>>>>>>>>> describe a repro. Knowing your topology before and after the change
> >>>>>>> would
> >>>>>>>>>> also be immensely helpful. You can print it with Topology.describe().
> >>>>>>>>>>
> >>>>>>>>>> Regardless, I'll make a note to take a look at the code tomorrow and
> >>>>>>> try
> >>>>>>>>> to
> >>>>>>>>>> decide if you should expect these behaviors with "clean" topology
> >>>>>>>>> changes.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> -John
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <
> >>>>>>> vasily@sulatskov.net>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> I am doing some experiments with kafka-streams KGroupedTable
> >>>>>>>>>>> aggregation, and admittedly I am not wiping data properly on each
> >>>>>>>>>>> restart, partially because I also wonder what would happen if you
> >>>>>>>>>>> change a streams topology without doing a proper reset.
> >>>>>>>>>>>
> >>>>>>>>>>> I've noticed that from time to time, kafka-streams
> >>>>>>>>>>> KGroupedTable.reduce() can call subtractor function with null
> >>>>>>>>>>> aggregator value, and if you try to work around that, by
> >>>>>>> interpreting
> >>>>>>>>>>> null aggregator value as zero for numeric value you get incorrect
> >>>>>>>>>>> aggregation result.
> >>>>>>>>>>>
> >>>>>>>>>>> I do understand that the proper way of handling this is to do a
> >>>>>>> reset
> >>>>>>>>>>> on topology changes, but I'd like to understand if there's any
> >>>>>>>>>>> legitmate case when kafka-streams can call an adder or a
> >>>>>>> substractor
> >>>>>>>>>>> with null aggregator value, and should I plan for this, or should I
> >>>>>>>>>>> interpret this as an invalid state, and terminate the application,
> >>>>>>> and
> >>>>>>>>>>> do a proper reset?
> >>>>>>>>>>>
> >>>>>>>>>>> Also, I can't seem to find a guide which explains when application
> >>>>>>>>>>> reset is necessary. Intuitively it seems that it should be done
> >>>>>>> every
> >>>>>>>>>>> time a topology changes. Any other cases?
> >>>>>>>>>>>
> >>>>>>>>>>> I tried to debug where the null value comes from and it seems that
> >>>>>>>>>>> KTableReduce.process() is getting called with Change<V> value with
> >>>>>>>>>>> newValue == null, and some non-null oldValue. Which leads to and to
> >>>>>>>>>>> subtractor being called with null aggregator value. I wonder how
> >>>>>>> it is
> >>>>>>>>>>> possible to have an old value for a key without a new value (does
> >>>>>>> it
> >>>>>>>>>>> happen because of the auto commit interval?).
> >>>>>>>>>>>
> >>>>>>>>>>> I've also noticed that it's possible for an input value from a
> >>>>>>> topic
> >>>>>>>>>>> to bypass aggregation function entirely and be directly
> >>>>>>> transmitted to
> >>>>>>>>>>> the output in certain cases: oldAgg is null, newValue is not null
> >>>>>>> and
> >>>>>>>>>>> oldValue is null - in that case newValue will be transmitted
> >>>>>>> directly
> >>>>>>>>>>> to the output. I suppose it's the correct behaviour, but feels a
> >>>>>>> bit
> >>>>>>>>>>> weird nonetheless. And I've actually been able to observe this
> >>>>>>>>>>> behaviour in practice. I suppose it's also caused by this happening
> >>>>>>>>>>> right before a commit happens, and the message is sent to a
> >>>>>>> changelog
> >>>>>>>>>>> topic.
> >>>>>>>>>>>
> >>>>>>>>>>> Please can someone with more knowledge shed some light on these
> >>>>>>> issues?
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Best regards,
> >>>>>>>>>>> Vasily Sulatskov
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Best regards,
> >>>>>>>>> Vasily Sulatskov
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Best regards,
> >>>>>>> Vasily Sulatskov
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
> >
>


--
Best regards,
Vasily Sulatskov

Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I see -- sorry for miss-understanding initially.

I agree that it would be possible to detect. Feel free to file a Jira
for this improvement and maybe pick it up by yourself :)


-Matthias

On 7/17/18 3:01 PM, Vasily Sulatskov wrote:
> Hi,
> 
> I do understand that in a general case it's not possible to guarantee
> that newValue and oldValue parts of a Change message arrive to the
> same partitions, and I guess that's not really in the plans, but if I
> correctly understand how it works, it should be possible to detect if
> both newValue and oldValue go to the same partition and keep them
> together, thus improving kafka-streams consistency guarantees. Right?
> 
> For example right now I have such a usecase that when I perform
> groupBy on a table, my new keys are computed purely from old keys, and
> not from the value. And handling of such cases (not a general case)
> can be improved.
> On Tue, Jul 17, 2018 at 1:48 AM Matthias J. Sax <ma...@confluent.io> wrote:
>>
>> It is not possible to use a single message, because both messages may go
>> to different partitions and may be processed by different applications
>> instances.
>>
>> Note, that the overall KTable state is sharded. Updating a single
>> upstream shard, might required to update two different downstream shards.
>>
>>
>> -Matthias
>>
>> On 7/16/18 2:50 PM, Vasily Sulatskov wrote:
>>> Hi,
>>>
>>> It seems that it wouldn't be that difficult to address: just don't
>>> break Change(newVal, oldVal) into Change(newVal, null) /
>>> Change(oldVal, null) and update aggregator value in one .process()
>>> call.
>>>
>>> Would this change make sense?
>>> On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax <ma...@confluent.io> wrote:
>>>>
>>>> Vasily,
>>>>
>>>> yes, it can happen. As you noticed, both messages might be processed on
>>>> different machines. Thus, Kafka Streams provides 'eventual consistency'
>>>> guarantees.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
>>>>> Hi John,
>>>>>
>>>>> Thanks a lot for you explanation. It does make much more sense now.
>>>>>
>>>>> The Jira issue I think is pretty well explained (with a reference to
>>>>> this thread). And I've lest my 2 cents in the pull request.
>>>>>
>>>>> You are right I didn't notice that repartition topic contains the same
>>>>> message effectively twice, and 0/1 bytes are non-visible, so when I
>>>>> used kafka-console-consumer I didn't notice that. So I have a quick
>>>>> suggestion here, wouldn't it make sense to change 0 and 1 bytes to
>>>>> something that has visible corresponding ascii characters, say + and
>>>>> -, as these messages are effectively commands to reducer to execute
>>>>> either an addition or subtraction?
>>>>>
>>>>> On a more serious, side, can you please explain temporal aspects of
>>>>> how change messages are handled? More specifically, is it guaranteed
>>>>> that both Change(newValue, null) and Change(null, oldValue) are
>>>>> handled before a new aggregated value is comitted to an output topic?
>>>>> Change(newValue, null) and Change(null, oldValue) are delivered as two
>>>>> separate messages via a kafka topic, and when they are read from a
>>>>> topic (possibly on a different machine where a commit interval is
>>>>> asynchronous to a machine that's put these changes into a topic) can
>>>>> it happen so a Change(newValue, null) is processed by a
>>>>> KTableReduceProcessor, the value of the aggregator is updated, and
>>>>> committed to the changelog topic, and a Change(null, oldValue) is
>>>>> processed only in the next commit interval? If I am understand this
>>>>> correctly that would mean that in an aggregated table an incorrect
>>>>> aggregated value will be observed briefly, before being eventually
>>>>> corrected.
>>>>>
>>>>> Can that happen? Or I can't see something that would make it impossible?
>>>>> On Fri, Jul 13, 2018 at 8:05 PM John Roesler <jo...@confluent.io> wrote:
>>>>>>
>>>>>> Hi Vasily,
>>>>>>
>>>>>> I'm glad you're making me look at this; it's good homework for me!
>>>>>>
>>>>>> This is very non-obvious, but here's what happens:
>>>>>>
>>>>>> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits
>>>>>> new/old Change pairs as the value.
>>>>>>
>>>>>> Next is the Select (aka GroupBy). In the DSL code, this is the
>>>>>> KTableRepartitionMap (we call it a repartition when you select a new key,
>>>>>> since the new keys may belong to different partitions).
>>>>>> KTableRepartitionMap is a processor that does two things:
>>>>>> 1. it maps K => K1 (new keys) and V => V1 (new values)
>>>>>> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
>>>>>> null)]
>>>>>> In other words, it turns each Change event into two events: a retraction
>>>>>> and an update
>>>>>>
>>>>>> Next comes the reduce operation. In building the processor node for this
>>>>>> operation, we create the sink, repartition topic, and source, followed by
>>>>>> the actual Reduce node. So if you want to look at how the changes get
>>>>>> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
>>>>>> You'll see that sink and source a ChangedSerializer and ChangedDeserializer.
>>>>>>
>>>>>> By looking into those implementations, I found that they depend on each
>>>>>> Change containing just one of new OR old. They serialize the underlying
>>>>>> value using the serde you provide, along with a single byte that signifies
>>>>>> if the serialized value is the new or old value, which the deserializer
>>>>>> uses on the receiving end to turn it back into a Change(new, null) or
>>>>>> Change(null, old) as appropriate. This is why the repartition topic looks
>>>>>> like it's just the raw data. It basically is, except for the magic byte.
>>>>>>
>>>>>> Does that make sense?
>>>>>>
>>>>>> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
>>>>>> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
>>>>>> leaving any feedback you have?
>>>>>>
>>>>>> Thanks,
>>>>>> -John
>>>>>>
>>>>>> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <va...@sulatskov.net>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi John,
>>>>>>>
>>>>>>> Thanks for your explanation.
>>>>>>>
>>>>>>> I have an answer to the practical question, i.e. a null aggregator
>>>>>>> value should be interpreted as a fatal application error.
>>>>>>>
>>>>>>> On the other hand, looking at the app topology, I see that a message
>>>>>>> from KSTREAM-REDUCE-0000000002 / "table" goes goes to
>>>>>>> KTABLE-SELECT-0000000006 which in turn forwards data to
>>>>>>> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
>>>>>>> this point I assume that data goes back to kafka into a *-repartition
>>>>>>> topic, after that the message is read from kafka by
>>>>>>> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
>>>>>>> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
>>>>>>> [aggregated-table]), where the actual aggregation takes place. What I
>>>>>>> don't get is where this Change value comes from, I mean if it's been
>>>>>>> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
>>>>>>> message goes through kafka where it gets serialized, and looking at
>>>>>>> kafka "repartition" topic, it contains regular values, not a pair of
>>>>>>> old/new.
>>>>>>>
>>>>>>> As far as I understand, Change is a purely in-memory representation of
>>>>>>> the state for a particular key, and at no point it's serialized back
>>>>>>> to kafka, yet somehow this Change values makes it to reducer. I feel
>>>>>>> like I am missing something here. Could you please clarify this?
>>>>>>>
>>>>>>> Can you please point me to a place in kafka-streams sources where a
>>>>>>> Change of newValue/oldValue is produced, so I could take a look? I
>>>>>>> found KTableReduce implementation, but can't find who makes these
>>>>>>> Change value.
>>>>>>> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <jo...@confluent.io> wrote:
>>>>>>>>
>>>>>>>> Hi again Vasily,
>>>>>>>>
>>>>>>>> Ok, it looks to me like this behavior is the result of the un-clean
>>>>>>>> topology change.
>>>>>>>>
>>>>>>>> Just in case you're interested, here's what I think happened.
>>>>>>>>
>>>>>>>> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table"
>>>>>>> )
>>>>>>>> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
>>>>>>>> forwarding both the old and new value that we are able to maintain
>>>>>>>> aggregates using the subtractor/adder pairs)
>>>>>>>>
>>>>>>>> 2. In the full topology, these old/new pairs go through some
>>>>>>>> transformations, but still in some form eventually make their way down to
>>>>>>>> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
>>>>>>>>
>>>>>>>> 3. The reduce processor logic looks like this:
>>>>>>>> final V oldAgg = store.get(key);
>>>>>>>> V newAgg = oldAgg;
>>>>>>>>
>>>>>>>> // first try to add the new value
>>>>>>>> if (value.newValue != null) {
>>>>>>>>     if (newAgg == null) {
>>>>>>>>         newAgg = value.newValue;
>>>>>>>>     } else {
>>>>>>>>         newAgg = addReducer.apply(newAgg, value.newValue);
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> // then try to remove the old value
>>>>>>>> if (value.oldValue != null) {
>>>>>>>>     // Here's where the assumption breaks down...
>>>>>>>>     newAgg = removeReducer.apply(newAgg, value.oldValue);
>>>>>>>> }
>>>>>>>>
>>>>>>>> 4. Here's what I think happened. This processor saw an event like
>>>>>>>> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block,
>>>>>>> and
>>>>>>>> (since "oldValue != null") would go into the second block. I think that
>>>>>>> in
>>>>>>>> the normal case we can rely on the invariant that any value we get as an
>>>>>>>> "oldValue" is one that we've previously seen ( as "newValue" ).
>>>>>>>> Consequently, we should be able to assume that if we get a non-null
>>>>>>>> "oldValue", "newAgg" will also not be null (because we would have written
>>>>>>>> it to the store back when we saw it as "newValue" and then retrieved it
>>>>>>>> just now as "newAgg = oldAgg").
>>>>>>>>
>>>>>>>> However, if subtopology2, along with KTABLE-SELECT-0000000006
>>>>>>>> and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
>>>>>>>> "table") has already emitted some values, then we might in fact receive
>>>>>>> an
>>>>>>>> event with some "oldValue" that we have in fact never seen before
>>>>>>> (because (
>>>>>>>> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when
>>>>>>> it
>>>>>>>> was first emitted as a "newValue").
>>>>>>>>
>>>>>>>> This would violate our assumption, and we would unintentionally send a
>>>>>>>> "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
>>>>>>>> If you want to double-check my reasoning, you should be able to do so in
>>>>>>>> the debugger with a breakpoint in KTableReduce.
>>>>>>>>
>>>>>>>>
>>>>>>>> tl;dr: Supposing you reset the app when the topology changes, I think
>>>>>>> that
>>>>>>>> you should be able to rely on non-null aggregates being passed in to
>>>>>>> *both*
>>>>>>>> the adder and subtractor in a reduce.
>>>>>>>>
>>>>>>>> I would be in favor, as you suggested, of adding an explicit check and
>>>>>>>> throwing an exception if the aggregate is ever null at those points. This
>>>>>>>> would actually help us detect if the topology has changed unexpectedly
>>>>>>> and
>>>>>>>> shut down, hopefully before any damage is done. I'll send a PR and see
>>>>>>> what
>>>>>>>> everyone thinks.
>>>>>>>>
>>>>>>>> Does this all seem like it adds up to you?
>>>>>>>> -John
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <va...@sulatskov.net>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi John,
>>>>>>>>>
>>>>>>>>> Thanks for your reply. I am not sure if this behavior I've observed is
>>>>>>>>> a bug or not, as I've not been resetting my application properly. On
>>>>>>>>> the other hand if the subtractor or adder in the reduce operation are
>>>>>>>>> never supposed to be called with null aggregator value, perhaps it
>>>>>>>>> would make sense to put a null check in the table reduce
>>>>>>>>> implementation to detect an application entering an invalid state. A
>>>>>>>>> bit like a check for topics having the same number of partitions when
>>>>>>>>> doing a join.
>>>>>>>>>
>>>>>>>>> Here's some information about my tests. Hope that can be useful:
>>>>>>>>>
>>>>>>>>> Topology at start:
>>>>>>>>>
>>>>>>>>> 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
>>>>>>>>>    Sub-topology: 0
>>>>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>>>>>>>>>       --> KSTREAM-MAP-0000000001
>>>>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>>>>>>>>>       --> KSTREAM-FILTER-0000000004
>>>>>>>>>       <-- KSTREAM-SOURCE-0000000000
>>>>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>>>>>>>>>       --> KSTREAM-SINK-0000000003
>>>>>>>>>       <-- KSTREAM-MAP-0000000001
>>>>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>>>>>>>>>       <-- KSTREAM-FILTER-0000000004
>>>>>>>>>
>>>>>>>>>   Sub-topology: 1
>>>>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>>>>>>>>>       --> KSTREAM-REDUCE-0000000002
>>>>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>>>>>>>>>       --> KTABLE-TOSTREAM-0000000006
>>>>>>>>>       <-- KSTREAM-SOURCE-0000000005
>>>>>>>>>     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
>>>>>>>>>       --> KSTREAM-SINK-0000000007
>>>>>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
>>>>>>>>>       <-- KTABLE-TOSTREAM-0000000006
>>>>>>>>>
>>>>>>>>> This topology just takes data from the source topic "slope" which
>>>>>>>>> produces messages like this:
>>>>>>>>>
>>>>>>>>> key1
>>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>>>>>> key3
>>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>>>>>> key2
>>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>>>>>> key3
>>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>>>>>> key1
>>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>>>>>> key2
>>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>>>>>>
>>>>>>>>> Every second, there are 3 new messages arrive from "slope" topic for
>>>>>>>>> keys key1, key2 and key3, with constantly increasing value.
>>>>>>>>> Data is transformed so that the original key is also tracked in the
>>>>>>>>> message value, grouped by key, and windowed with a custom window, and
>>>>>>>>> reduced with a dummy reduce operation to make a KTable.
>>>>>>>>> KTable is converted back to a stream, and sent to a topic (just for
>>>>>>>>> debugging purposes).
>>>>>>>>>
>>>>>>>>> Here's the source (it's kafka-streams-scala for the most part). Also
>>>>>>>>> please ignore silly classes, it's obviously a test:
>>>>>>>>>
>>>>>>>>>     val slopeTable = builder
>>>>>>>>>       .stream[String, TimedValue]("slope")
>>>>>>>>>       .map(
>>>>>>>>>         (key, value) =>
>>>>>>>>>           (
>>>>>>>>>             StringWrapper(key),
>>>>>>>>>             TimedValueWithKey(value = value.value, timestamp =
>>>>>>>>> value.timestamp, key = key)
>>>>>>>>>         )
>>>>>>>>>       )
>>>>>>>>>       .groupByKey
>>>>>>>>>       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
>>>>>>>>>       .reduceMat((aggValue, newValue) => newValue, "table")
>>>>>>>>>
>>>>>>>>>     slopeTable.toStream
>>>>>>>>>       .to("slope-table")
>>>>>>>>>
>>>>>>>>> Topology after change without a proper reset:
>>>>>>>>>
>>>>>>>>> 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
>>>>>>>>>    Sub-topology: 0
>>>>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>>>>>>>>>       --> KSTREAM-MAP-0000000001
>>>>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>>>>>>>>>       --> KSTREAM-FILTER-0000000004
>>>>>>>>>       <-- KSTREAM-SOURCE-0000000000
>>>>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>>>>>>>>>       --> KSTREAM-SINK-0000000003
>>>>>>>>>       <-- KSTREAM-MAP-0000000001
>>>>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>>>>>>>>>       <-- KSTREAM-FILTER-0000000004
>>>>>>>>>
>>>>>>>>>   Sub-topology: 1
>>>>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>>>>>>>>>       --> KSTREAM-REDUCE-0000000002
>>>>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>>>>>>>>>       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
>>>>>>>>>       <-- KSTREAM-SOURCE-0000000005
>>>>>>>>>     Processor: KTABLE-SELECT-0000000006 (stores: [])
>>>>>>>>>       --> KSTREAM-SINK-0000000007
>>>>>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>>>>>     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
>>>>>>>>>       --> KSTREAM-SINK-0000000013
>>>>>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
>>>>>>>>>       <-- KTABLE-SELECT-0000000006
>>>>>>>>>     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
>>>>>>>>>       <-- KTABLE-TOSTREAM-0000000012
>>>>>>>>>
>>>>>>>>>   Sub-topology: 2
>>>>>>>>>     Source: KSTREAM-SOURCE-0000000008 (topics:
>>>>>>>>> [aggregated-table-repartition])
>>>>>>>>>       --> KTABLE-REDUCE-0000000009
>>>>>>>>>     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
>>>>>>>>>       --> KTABLE-TOSTREAM-0000000010
>>>>>>>>>       <-- KSTREAM-SOURCE-0000000008
>>>>>>>>>     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
>>>>>>>>>       --> KSTREAM-SINK-0000000011
>>>>>>>>>       <-- KTABLE-REDUCE-0000000009
>>>>>>>>>     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
>>>>>>>>>       <-- KTABLE-TOSTREAM-0000000010
>>>>>>>>>
>>>>>>>>> Here's the source of the sub-topology that does table aggregation:
>>>>>>>>>
>>>>>>>>>     slopeTable
>>>>>>>>>       .groupBy(
>>>>>>>>>         (key, value) => (new Windowed(StringWrapper("dummykey"),
>>>>>>>>> key.window()), value)
>>>>>>>>>       )
>>>>>>>>>       .reduceMat(adder = (aggValue, newValue) => {
>>>>>>>>>         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
>>>>>>>>>         val agg = Option(aggValue)
>>>>>>>>>         TimedValueWithKey(
>>>>>>>>>           value = agg.map(_.value).getOrElse(0) + newValue.value,
>>>>>>>>>           timestamp =
>>>>>>>>>
>>>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
>>>>>>>>> newValue.timestamp),
>>>>>>>>>           key = "reduced"
>>>>>>>>>         )
>>>>>>>>>       }, subtractor = (aggValue, newValue) => {
>>>>>>>>>         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
>>>>>>>>>         val agg = Option(aggValue)
>>>>>>>>>         TimedValueWithKey(
>>>>>>>>>           value = agg.map(_.value).getOrElse(0) - newValue.value,
>>>>>>>>>           timestamp =
>>>>>>>>>
>>>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
>>>>>>>>> newValue.timestamp),
>>>>>>>>>           key = "reduced"
>>>>>>>>>         )
>>>>>>>>>       }, "aggregated-table")
>>>>>>>>>       .toStream
>>>>>>>>>       .to("slope-aggregated-table")
>>>>>>>>>
>>>>>>>>> I log all calls to adder and subtractor, so I am able to see what's
>>>>>>>>> going on there, as well as I track the original keys of the aggregated
>>>>>>>>> values and their timestamps, so it's relatively easy to see how the
>>>>>>>>> data goes through this topology
>>>>>>>>>
>>>>>>>>> In order to reproduce this behavior I need to:
>>>>>>>>> 1. Start a full topology (with table aggregation)
>>>>>>>>> 2. Start without table aggregation (no app reset)
>>>>>>>>> 3. Start with table aggregation (no app reset)
>>>>>>>>>
>>>>>>>>> Bellow is an interpretation of the adder/subtractor logs for a given
>>>>>>>>> key/window in the chronological order
>>>>>>>>>
>>>>>>>>> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
>>>>>>>>> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
>>>>>>>>> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
>>>>>>>>> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
>>>>>>>>> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
>>>>>>>>> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
>>>>>>>>>
>>>>>>>>> And in the end the last value that's materialized for that window
>>>>>>>>> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
>>>>>>>>> for a single key between some point in the middle of the window and at
>>>>>>>>> the end of the window, times 3. As opposed to the expected value of
>>>>>>>>> 751 * 3 = 2253 (sum of last values in a time window for all keys being
>>>>>>>>> aggregated).
>>>>>>>>>
>>>>>>>>> It's clear to me that I should do an application reset, but I also
>>>>>>>>> would like to understand, should I expect adder/subtractor being
>>>>>>>>> called with null aggValue, or is it a clear sign that something went
>>>>>>>>> horribly wrong?
>>>>>>>>>
>>>>>>>>> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <jo...@confluent.io>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Vasily,
>>>>>>>>>>
>>>>>>>>>> Thanks for the email.
>>>>>>>>>>
>>>>>>>>>> To answer your question: you should reset the application basically
>>>>>>> any
>>>>>>>>>> time you change the topology. Some transitions are safe, but others
>>>>>>> will
>>>>>>>>>> result in data loss or corruption. Rather than try to reason about
>>>>>>> which
>>>>>>>>> is
>>>>>>>>>> which, it's much safer just to either reset the app or not change it
>>>>>>> (if
>>>>>>>>> it
>>>>>>>>>> has important state).
>>>>>>>>>>
>>>>>>>>>> Beyond changes that you make to the topology, we spend a lot of
>>>>>>> effort to
>>>>>>>>>> try and make sure that different versions of Streams will produce the
>>>>>>>>> same
>>>>>>>>>> topology, so unless the release notes say otherwise, you should be
>>>>>>> able
>>>>>>>>> to
>>>>>>>>>> upgrade without a reset.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I can't say right now whether those wacky behaviors are bugs or the
>>>>>>>>> result
>>>>>>>>>> of changing the topology without a reset. Or if they are correct but
>>>>>>>>>> surprising behavior somehow. I'll look into it tomorrow. Do feel
>>>>>>> free to
>>>>>>>>>> open a Jira ticket if you think you have found a bug, especially if
>>>>>>> you
>>>>>>>>> can
>>>>>>>>>> describe a repro. Knowing your topology before and after the change
>>>>>>> would
>>>>>>>>>> also be immensely helpful. You can print it with Topology.describe().
>>>>>>>>>>
>>>>>>>>>> Regardless, I'll make a note to take a look at the code tomorrow and
>>>>>>> try
>>>>>>>>> to
>>>>>>>>>> decide if you should expect these behaviors with "clean" topology
>>>>>>>>> changes.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> -John
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <
>>>>>>> vasily@sulatskov.net>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I am doing some experiments with kafka-streams KGroupedTable
>>>>>>>>>>> aggregation, and admittedly I am not wiping data properly on each
>>>>>>>>>>> restart, partially because I also wonder what would happen if you
>>>>>>>>>>> change a streams topology without doing a proper reset.
>>>>>>>>>>>
>>>>>>>>>>> I've noticed that from time to time, kafka-streams
>>>>>>>>>>> KGroupedTable.reduce() can call subtractor function with null
>>>>>>>>>>> aggregator value, and if you try to work around that, by
>>>>>>> interpreting
>>>>>>>>>>> null aggregator value as zero for numeric value you get incorrect
>>>>>>>>>>> aggregation result.
>>>>>>>>>>>
>>>>>>>>>>> I do understand that the proper way of handling this is to do a
>>>>>>> reset
>>>>>>>>>>> on topology changes, but I'd like to understand if there's any
>>>>>>>>>>> legitmate case when kafka-streams can call an adder or a
>>>>>>> substractor
>>>>>>>>>>> with null aggregator value, and should I plan for this, or should I
>>>>>>>>>>> interpret this as an invalid state, and terminate the application,
>>>>>>> and
>>>>>>>>>>> do a proper reset?
>>>>>>>>>>>
>>>>>>>>>>> Also, I can't seem to find a guide which explains when application
>>>>>>>>>>> reset is necessary. Intuitively it seems that it should be done
>>>>>>> every
>>>>>>>>>>> time a topology changes. Any other cases?
>>>>>>>>>>>
>>>>>>>>>>> I tried to debug where the null value comes from and it seems that
>>>>>>>>>>> KTableReduce.process() is getting called with Change<V> value with
>>>>>>>>>>> newValue == null, and some non-null oldValue. Which leads to and to
>>>>>>>>>>> subtractor being called with null aggregator value. I wonder how
>>>>>>> it is
>>>>>>>>>>> possible to have an old value for a key without a new value (does
>>>>>>> it
>>>>>>>>>>> happen because of the auto commit interval?).
>>>>>>>>>>>
>>>>>>>>>>> I've also noticed that it's possible for an input value from a
>>>>>>> topic
>>>>>>>>>>> to bypass aggregation function entirely and be directly
>>>>>>> transmitted to
>>>>>>>>>>> the output in certain cases: oldAgg is null, newValue is not null
>>>>>>> and
>>>>>>>>>>> oldValue is null - in that case newValue will be transmitted
>>>>>>> directly
>>>>>>>>>>> to the output. I suppose it's the correct behaviour, but feels a
>>>>>>> bit
>>>>>>>>>>> weird nonetheless. And I've actually been able to observe this
>>>>>>>>>>> behaviour in practice. I suppose it's also caused by this happening
>>>>>>>>>>> right before a commit happens, and the message is sent to a
>>>>>>> changelog
>>>>>>>>>>> topic.
>>>>>>>>>>>
>>>>>>>>>>> Please can someone with more knowledge shed some light on these
>>>>>>> issues?
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Vasily Sulatskov
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards,
>>>>>>>>> Vasily Sulatskov
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Vasily Sulatskov
>>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 
> 


Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by Vasily Sulatskov <va...@sulatskov.net>.
Hi,

I do understand that in a general case it's not possible to guarantee
that newValue and oldValue parts of a Change message arrive to the
same partitions, and I guess that's not really in the plans, but if I
correctly understand how it works, it should be possible to detect if
both newValue and oldValue go to the same partition and keep them
together, thus improving kafka-streams consistency guarantees. Right?

For example right now I have such a usecase that when I perform
groupBy on a table, my new keys are computed purely from old keys, and
not from the value. And handling of such cases (not a general case)
can be improved.
On Tue, Jul 17, 2018 at 1:48 AM Matthias J. Sax <ma...@confluent.io> wrote:
>
> It is not possible to use a single message, because both messages may go
> to different partitions and may be processed by different applications
> instances.
>
> Note, that the overall KTable state is sharded. Updating a single
> upstream shard, might required to update two different downstream shards.
>
>
> -Matthias
>
> On 7/16/18 2:50 PM, Vasily Sulatskov wrote:
> > Hi,
> >
> > It seems that it wouldn't be that difficult to address: just don't
> > break Change(newVal, oldVal) into Change(newVal, null) /
> > Change(oldVal, null) and update aggregator value in one .process()
> > call.
> >
> > Would this change make sense?
> > On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax <ma...@confluent.io> wrote:
> >>
> >> Vasily,
> >>
> >> yes, it can happen. As you noticed, both messages might be processed on
> >> different machines. Thus, Kafka Streams provides 'eventual consistency'
> >> guarantees.
> >>
> >>
> >> -Matthias
> >>
> >> On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
> >>> Hi John,
> >>>
> >>> Thanks a lot for you explanation. It does make much more sense now.
> >>>
> >>> The Jira issue I think is pretty well explained (with a reference to
> >>> this thread). And I've lest my 2 cents in the pull request.
> >>>
> >>> You are right I didn't notice that repartition topic contains the same
> >>> message effectively twice, and 0/1 bytes are non-visible, so when I
> >>> used kafka-console-consumer I didn't notice that. So I have a quick
> >>> suggestion here, wouldn't it make sense to change 0 and 1 bytes to
> >>> something that has visible corresponding ascii characters, say + and
> >>> -, as these messages are effectively commands to reducer to execute
> >>> either an addition or subtraction?
> >>>
> >>> On a more serious, side, can you please explain temporal aspects of
> >>> how change messages are handled? More specifically, is it guaranteed
> >>> that both Change(newValue, null) and Change(null, oldValue) are
> >>> handled before a new aggregated value is comitted to an output topic?
> >>> Change(newValue, null) and Change(null, oldValue) are delivered as two
> >>> separate messages via a kafka topic, and when they are read from a
> >>> topic (possibly on a different machine where a commit interval is
> >>> asynchronous to a machine that's put these changes into a topic) can
> >>> it happen so a Change(newValue, null) is processed by a
> >>> KTableReduceProcessor, the value of the aggregator is updated, and
> >>> committed to the changelog topic, and a Change(null, oldValue) is
> >>> processed only in the next commit interval? If I am understand this
> >>> correctly that would mean that in an aggregated table an incorrect
> >>> aggregated value will be observed briefly, before being eventually
> >>> corrected.
> >>>
> >>> Can that happen? Or I can't see something that would make it impossible?
> >>> On Fri, Jul 13, 2018 at 8:05 PM John Roesler <jo...@confluent.io> wrote:
> >>>>
> >>>> Hi Vasily,
> >>>>
> >>>> I'm glad you're making me look at this; it's good homework for me!
> >>>>
> >>>> This is very non-obvious, but here's what happens:
> >>>>
> >>>> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits
> >>>> new/old Change pairs as the value.
> >>>>
> >>>> Next is the Select (aka GroupBy). In the DSL code, this is the
> >>>> KTableRepartitionMap (we call it a repartition when you select a new key,
> >>>> since the new keys may belong to different partitions).
> >>>> KTableRepartitionMap is a processor that does two things:
> >>>> 1. it maps K => K1 (new keys) and V => V1 (new values)
> >>>> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
> >>>> null)]
> >>>> In other words, it turns each Change event into two events: a retraction
> >>>> and an update
> >>>>
> >>>> Next comes the reduce operation. In building the processor node for this
> >>>> operation, we create the sink, repartition topic, and source, followed by
> >>>> the actual Reduce node. So if you want to look at how the changes get
> >>>> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
> >>>> You'll see that sink and source a ChangedSerializer and ChangedDeserializer.
> >>>>
> >>>> By looking into those implementations, I found that they depend on each
> >>>> Change containing just one of new OR old. They serialize the underlying
> >>>> value using the serde you provide, along with a single byte that signifies
> >>>> if the serialized value is the new or old value, which the deserializer
> >>>> uses on the receiving end to turn it back into a Change(new, null) or
> >>>> Change(null, old) as appropriate. This is why the repartition topic looks
> >>>> like it's just the raw data. It basically is, except for the magic byte.
> >>>>
> >>>> Does that make sense?
> >>>>
> >>>> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
> >>>> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
> >>>> leaving any feedback you have?
> >>>>
> >>>> Thanks,
> >>>> -John
> >>>>
> >>>> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <va...@sulatskov.net>
> >>>> wrote:
> >>>>
> >>>>> Hi John,
> >>>>>
> >>>>> Thanks for your explanation.
> >>>>>
> >>>>> I have an answer to the practical question, i.e. a null aggregator
> >>>>> value should be interpreted as a fatal application error.
> >>>>>
> >>>>> On the other hand, looking at the app topology, I see that a message
> >>>>> from KSTREAM-REDUCE-0000000002 / "table" goes goes to
> >>>>> KTABLE-SELECT-0000000006 which in turn forwards data to
> >>>>> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
> >>>>> this point I assume that data goes back to kafka into a *-repartition
> >>>>> topic, after that the message is read from kafka by
> >>>>> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
> >>>>> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
> >>>>> [aggregated-table]), where the actual aggregation takes place. What I
> >>>>> don't get is where this Change value comes from, I mean if it's been
> >>>>> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
> >>>>> message goes through kafka where it gets serialized, and looking at
> >>>>> kafka "repartition" topic, it contains regular values, not a pair of
> >>>>> old/new.
> >>>>>
> >>>>> As far as I understand, Change is a purely in-memory representation of
> >>>>> the state for a particular key, and at no point it's serialized back
> >>>>> to kafka, yet somehow this Change values makes it to reducer. I feel
> >>>>> like I am missing something here. Could you please clarify this?
> >>>>>
> >>>>> Can you please point me to a place in kafka-streams sources where a
> >>>>> Change of newValue/oldValue is produced, so I could take a look? I
> >>>>> found KTableReduce implementation, but can't find who makes these
> >>>>> Change value.
> >>>>> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <jo...@confluent.io> wrote:
> >>>>>>
> >>>>>> Hi again Vasily,
> >>>>>>
> >>>>>> Ok, it looks to me like this behavior is the result of the un-clean
> >>>>>> topology change.
> >>>>>>
> >>>>>> Just in case you're interested, here's what I think happened.
> >>>>>>
> >>>>>> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table"
> >>>>> )
> >>>>>> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
> >>>>>> forwarding both the old and new value that we are able to maintain
> >>>>>> aggregates using the subtractor/adder pairs)
> >>>>>>
> >>>>>> 2. In the full topology, these old/new pairs go through some
> >>>>>> transformations, but still in some form eventually make their way down to
> >>>>>> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
> >>>>>>
> >>>>>> 3. The reduce processor logic looks like this:
> >>>>>> final V oldAgg = store.get(key);
> >>>>>> V newAgg = oldAgg;
> >>>>>>
> >>>>>> // first try to add the new value
> >>>>>> if (value.newValue != null) {
> >>>>>>     if (newAgg == null) {
> >>>>>>         newAgg = value.newValue;
> >>>>>>     } else {
> >>>>>>         newAgg = addReducer.apply(newAgg, value.newValue);
> >>>>>>     }
> >>>>>> }
> >>>>>>
> >>>>>> // then try to remove the old value
> >>>>>> if (value.oldValue != null) {
> >>>>>>     // Here's where the assumption breaks down...
> >>>>>>     newAgg = removeReducer.apply(newAgg, value.oldValue);
> >>>>>> }
> >>>>>>
> >>>>>> 4. Here's what I think happened. This processor saw an event like
> >>>>>> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block,
> >>>>> and
> >>>>>> (since "oldValue != null") would go into the second block. I think that
> >>>>> in
> >>>>>> the normal case we can rely on the invariant that any value we get as an
> >>>>>> "oldValue" is one that we've previously seen ( as "newValue" ).
> >>>>>> Consequently, we should be able to assume that if we get a non-null
> >>>>>> "oldValue", "newAgg" will also not be null (because we would have written
> >>>>>> it to the store back when we saw it as "newValue" and then retrieved it
> >>>>>> just now as "newAgg = oldAgg").
> >>>>>>
> >>>>>> However, if subtopology2, along with KTABLE-SELECT-0000000006
> >>>>>> and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
> >>>>>> "table") has already emitted some values, then we might in fact receive
> >>>>> an
> >>>>>> event with some "oldValue" that we have in fact never seen before
> >>>>> (because (
> >>>>>> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when
> >>>>> it
> >>>>>> was first emitted as a "newValue").
> >>>>>>
> >>>>>> This would violate our assumption, and we would unintentionally send a
> >>>>>> "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
> >>>>>> If you want to double-check my reasoning, you should be able to do so in
> >>>>>> the debugger with a breakpoint in KTableReduce.
> >>>>>>
> >>>>>>
> >>>>>> tl;dr: Supposing you reset the app when the topology changes, I think
> >>>>> that
> >>>>>> you should be able to rely on non-null aggregates being passed in to
> >>>>> *both*
> >>>>>> the adder and subtractor in a reduce.
> >>>>>>
> >>>>>> I would be in favor, as you suggested, of adding an explicit check and
> >>>>>> throwing an exception if the aggregate is ever null at those points. This
> >>>>>> would actually help us detect if the topology has changed unexpectedly
> >>>>> and
> >>>>>> shut down, hopefully before any damage is done. I'll send a PR and see
> >>>>> what
> >>>>>> everyone thinks.
> >>>>>>
> >>>>>> Does this all seem like it adds up to you?
> >>>>>> -John
> >>>>>>
> >>>>>>
> >>>>>> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <va...@sulatskov.net>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi John,
> >>>>>>>
> >>>>>>> Thanks for your reply. I am not sure if this behavior I've observed is
> >>>>>>> a bug or not, as I've not been resetting my application properly. On
> >>>>>>> the other hand if the subtractor or adder in the reduce operation are
> >>>>>>> never supposed to be called with null aggregator value, perhaps it
> >>>>>>> would make sense to put a null check in the table reduce
> >>>>>>> implementation to detect an application entering an invalid state. A
> >>>>>>> bit like a check for topics having the same number of partitions when
> >>>>>>> doing a join.
> >>>>>>>
> >>>>>>> Here's some information about my tests. Hope that can be useful:
> >>>>>>>
> >>>>>>> Topology at start:
> >>>>>>>
> >>>>>>> 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
> >>>>>>>    Sub-topology: 0
> >>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >>>>>>>       --> KSTREAM-MAP-0000000001
> >>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >>>>>>>       --> KSTREAM-FILTER-0000000004
> >>>>>>>       <-- KSTREAM-SOURCE-0000000000
> >>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >>>>>>>       --> KSTREAM-SINK-0000000003
> >>>>>>>       <-- KSTREAM-MAP-0000000001
> >>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >>>>>>>       <-- KSTREAM-FILTER-0000000004
> >>>>>>>
> >>>>>>>   Sub-topology: 1
> >>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >>>>>>>       --> KSTREAM-REDUCE-0000000002
> >>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >>>>>>>       --> KTABLE-TOSTREAM-0000000006
> >>>>>>>       <-- KSTREAM-SOURCE-0000000005
> >>>>>>>     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
> >>>>>>>       --> KSTREAM-SINK-0000000007
> >>>>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
> >>>>>>>       <-- KTABLE-TOSTREAM-0000000006
> >>>>>>>
> >>>>>>> This topology just takes data from the source topic "slope" which
> >>>>>>> produces messages like this:
> >>>>>>>
> >>>>>>> key1
> >>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>>>> key3
> >>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>>>> key2
> >>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>>>> key3
> >>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>>>> key1
> >>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>>>> key2
> >>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>>>>
> >>>>>>> Every second, there are 3 new messages arrive from "slope" topic for
> >>>>>>> keys key1, key2 and key3, with constantly increasing value.
> >>>>>>> Data is transformed so that the original key is also tracked in the
> >>>>>>> message value, grouped by key, and windowed with a custom window, and
> >>>>>>> reduced with a dummy reduce operation to make a KTable.
> >>>>>>> KTable is converted back to a stream, and sent to a topic (just for
> >>>>>>> debugging purposes).
> >>>>>>>
> >>>>>>> Here's the source (it's kafka-streams-scala for the most part). Also
> >>>>>>> please ignore silly classes, it's obviously a test:
> >>>>>>>
> >>>>>>>     val slopeTable = builder
> >>>>>>>       .stream[String, TimedValue]("slope")
> >>>>>>>       .map(
> >>>>>>>         (key, value) =>
> >>>>>>>           (
> >>>>>>>             StringWrapper(key),
> >>>>>>>             TimedValueWithKey(value = value.value, timestamp =
> >>>>>>> value.timestamp, key = key)
> >>>>>>>         )
> >>>>>>>       )
> >>>>>>>       .groupByKey
> >>>>>>>       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
> >>>>>>>       .reduceMat((aggValue, newValue) => newValue, "table")
> >>>>>>>
> >>>>>>>     slopeTable.toStream
> >>>>>>>       .to("slope-table")
> >>>>>>>
> >>>>>>> Topology after change without a proper reset:
> >>>>>>>
> >>>>>>> 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
> >>>>>>>    Sub-topology: 0
> >>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >>>>>>>       --> KSTREAM-MAP-0000000001
> >>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >>>>>>>       --> KSTREAM-FILTER-0000000004
> >>>>>>>       <-- KSTREAM-SOURCE-0000000000
> >>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >>>>>>>       --> KSTREAM-SINK-0000000003
> >>>>>>>       <-- KSTREAM-MAP-0000000001
> >>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >>>>>>>       <-- KSTREAM-FILTER-0000000004
> >>>>>>>
> >>>>>>>   Sub-topology: 1
> >>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >>>>>>>       --> KSTREAM-REDUCE-0000000002
> >>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >>>>>>>       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
> >>>>>>>       <-- KSTREAM-SOURCE-0000000005
> >>>>>>>     Processor: KTABLE-SELECT-0000000006 (stores: [])
> >>>>>>>       --> KSTREAM-SINK-0000000007
> >>>>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>>>     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
> >>>>>>>       --> KSTREAM-SINK-0000000013
> >>>>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
> >>>>>>>       <-- KTABLE-SELECT-0000000006
> >>>>>>>     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
> >>>>>>>       <-- KTABLE-TOSTREAM-0000000012
> >>>>>>>
> >>>>>>>   Sub-topology: 2
> >>>>>>>     Source: KSTREAM-SOURCE-0000000008 (topics:
> >>>>>>> [aggregated-table-repartition])
> >>>>>>>       --> KTABLE-REDUCE-0000000009
> >>>>>>>     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
> >>>>>>>       --> KTABLE-TOSTREAM-0000000010
> >>>>>>>       <-- KSTREAM-SOURCE-0000000008
> >>>>>>>     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
> >>>>>>>       --> KSTREAM-SINK-0000000011
> >>>>>>>       <-- KTABLE-REDUCE-0000000009
> >>>>>>>     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
> >>>>>>>       <-- KTABLE-TOSTREAM-0000000010
> >>>>>>>
> >>>>>>> Here's the source of the sub-topology that does table aggregation:
> >>>>>>>
> >>>>>>>     slopeTable
> >>>>>>>       .groupBy(
> >>>>>>>         (key, value) => (new Windowed(StringWrapper("dummykey"),
> >>>>>>> key.window()), value)
> >>>>>>>       )
> >>>>>>>       .reduceMat(adder = (aggValue, newValue) => {
> >>>>>>>         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
> >>>>>>>         val agg = Option(aggValue)
> >>>>>>>         TimedValueWithKey(
> >>>>>>>           value = agg.map(_.value).getOrElse(0) + newValue.value,
> >>>>>>>           timestamp =
> >>>>>>>
> >>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> >>>>>>> newValue.timestamp),
> >>>>>>>           key = "reduced"
> >>>>>>>         )
> >>>>>>>       }, subtractor = (aggValue, newValue) => {
> >>>>>>>         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
> >>>>>>>         val agg = Option(aggValue)
> >>>>>>>         TimedValueWithKey(
> >>>>>>>           value = agg.map(_.value).getOrElse(0) - newValue.value,
> >>>>>>>           timestamp =
> >>>>>>>
> >>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> >>>>>>> newValue.timestamp),
> >>>>>>>           key = "reduced"
> >>>>>>>         )
> >>>>>>>       }, "aggregated-table")
> >>>>>>>       .toStream
> >>>>>>>       .to("slope-aggregated-table")
> >>>>>>>
> >>>>>>> I log all calls to adder and subtractor, so I am able to see what's
> >>>>>>> going on there, as well as I track the original keys of the aggregated
> >>>>>>> values and their timestamps, so it's relatively easy to see how the
> >>>>>>> data goes through this topology
> >>>>>>>
> >>>>>>> In order to reproduce this behavior I need to:
> >>>>>>> 1. Start a full topology (with table aggregation)
> >>>>>>> 2. Start without table aggregation (no app reset)
> >>>>>>> 3. Start with table aggregation (no app reset)
> >>>>>>>
> >>>>>>> Bellow is an interpretation of the adder/subtractor logs for a given
> >>>>>>> key/window in the chronological order
> >>>>>>>
> >>>>>>> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> >>>>>>> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> >>>>>>> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
> >>>>>>> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
> >>>>>>> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
> >>>>>>> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
> >>>>>>>
> >>>>>>> And in the end the last value that's materialized for that window
> >>>>>>> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
> >>>>>>> for a single key between some point in the middle of the window and at
> >>>>>>> the end of the window, times 3. As opposed to the expected value of
> >>>>>>> 751 * 3 = 2253 (sum of last values in a time window for all keys being
> >>>>>>> aggregated).
> >>>>>>>
> >>>>>>> It's clear to me that I should do an application reset, but I also
> >>>>>>> would like to understand, should I expect adder/subtractor being
> >>>>>>> called with null aggValue, or is it a clear sign that something went
> >>>>>>> horribly wrong?
> >>>>>>>
> >>>>>>> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <jo...@confluent.io>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi Vasily,
> >>>>>>>>
> >>>>>>>> Thanks for the email.
> >>>>>>>>
> >>>>>>>> To answer your question: you should reset the application basically
> >>>>> any
> >>>>>>>> time you change the topology. Some transitions are safe, but others
> >>>>> will
> >>>>>>>> result in data loss or corruption. Rather than try to reason about
> >>>>> which
> >>>>>>> is
> >>>>>>>> which, it's much safer just to either reset the app or not change it
> >>>>> (if
> >>>>>>> it
> >>>>>>>> has important state).
> >>>>>>>>
> >>>>>>>> Beyond changes that you make to the topology, we spend a lot of
> >>>>> effort to
> >>>>>>>> try and make sure that different versions of Streams will produce the
> >>>>>>> same
> >>>>>>>> topology, so unless the release notes say otherwise, you should be
> >>>>> able
> >>>>>>> to
> >>>>>>>> upgrade without a reset.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> I can't say right now whether those wacky behaviors are bugs or the
> >>>>>>> result
> >>>>>>>> of changing the topology without a reset. Or if they are correct but
> >>>>>>>> surprising behavior somehow. I'll look into it tomorrow. Do feel
> >>>>> free to
> >>>>>>>> open a Jira ticket if you think you have found a bug, especially if
> >>>>> you
> >>>>>>> can
> >>>>>>>> describe a repro. Knowing your topology before and after the change
> >>>>> would
> >>>>>>>> also be immensely helpful. You can print it with Topology.describe().
> >>>>>>>>
> >>>>>>>> Regardless, I'll make a note to take a look at the code tomorrow and
> >>>>> try
> >>>>>>> to
> >>>>>>>> decide if you should expect these behaviors with "clean" topology
> >>>>>>> changes.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> -John
> >>>>>>>>
> >>>>>>>> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <
> >>>>> vasily@sulatskov.net>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> I am doing some experiments with kafka-streams KGroupedTable
> >>>>>>>>> aggregation, and admittedly I am not wiping data properly on each
> >>>>>>>>> restart, partially because I also wonder what would happen if you
> >>>>>>>>> change a streams topology without doing a proper reset.
> >>>>>>>>>
> >>>>>>>>> I've noticed that from time to time, kafka-streams
> >>>>>>>>> KGroupedTable.reduce() can call subtractor function with null
> >>>>>>>>> aggregator value, and if you try to work around that, by
> >>>>> interpreting
> >>>>>>>>> null aggregator value as zero for numeric value you get incorrect
> >>>>>>>>> aggregation result.
> >>>>>>>>>
> >>>>>>>>> I do understand that the proper way of handling this is to do a
> >>>>> reset
> >>>>>>>>> on topology changes, but I'd like to understand if there's any
> >>>>>>>>> legitmate case when kafka-streams can call an adder or a
> >>>>> substractor
> >>>>>>>>> with null aggregator value, and should I plan for this, or should I
> >>>>>>>>> interpret this as an invalid state, and terminate the application,
> >>>>> and
> >>>>>>>>> do a proper reset?
> >>>>>>>>>
> >>>>>>>>> Also, I can't seem to find a guide which explains when application
> >>>>>>>>> reset is necessary. Intuitively it seems that it should be done
> >>>>> every
> >>>>>>>>> time a topology changes. Any other cases?
> >>>>>>>>>
> >>>>>>>>> I tried to debug where the null value comes from and it seems that
> >>>>>>>>> KTableReduce.process() is getting called with Change<V> value with
> >>>>>>>>> newValue == null, and some non-null oldValue. Which leads to and to
> >>>>>>>>> subtractor being called with null aggregator value. I wonder how
> >>>>> it is
> >>>>>>>>> possible to have an old value for a key without a new value (does
> >>>>> it
> >>>>>>>>> happen because of the auto commit interval?).
> >>>>>>>>>
> >>>>>>>>> I've also noticed that it's possible for an input value from a
> >>>>> topic
> >>>>>>>>> to bypass aggregation function entirely and be directly
> >>>>> transmitted to
> >>>>>>>>> the output in certain cases: oldAgg is null, newValue is not null
> >>>>> and
> >>>>>>>>> oldValue is null - in that case newValue will be transmitted
> >>>>> directly
> >>>>>>>>> to the output. I suppose it's the correct behaviour, but feels a
> >>>>> bit
> >>>>>>>>> weird nonetheless. And I've actually been able to observe this
> >>>>>>>>> behaviour in practice. I suppose it's also caused by this happening
> >>>>>>>>> right before a commit happens, and the message is sent to a
> >>>>> changelog
> >>>>>>>>> topic.
> >>>>>>>>>
> >>>>>>>>> Please can someone with more knowledge shed some light on these
> >>>>> issues?
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Best regards,
> >>>>>>>>> Vasily Sulatskov
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Best regards,
> >>>>>>> Vasily Sulatskov
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Best regards,
> >>>>> Vasily Sulatskov
> >>>>>
> >>>
> >>>
> >>>
> >>
> >
> >
>


-- 
Best regards,
Vasily Sulatskov

Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by "Matthias J. Sax" <ma...@confluent.io>.
It is not possible to use a single message, because both messages may go
to different partitions and may be processed by different applications
instances.

Note, that the overall KTable state is sharded. Updating a single
upstream shard, might required to update two different downstream shards.


-Matthias

On 7/16/18 2:50 PM, Vasily Sulatskov wrote:
> Hi,
> 
> It seems that it wouldn't be that difficult to address: just don't
> break Change(newVal, oldVal) into Change(newVal, null) /
> Change(oldVal, null) and update aggregator value in one .process()
> call.
> 
> Would this change make sense?
> On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax <ma...@confluent.io> wrote:
>>
>> Vasily,
>>
>> yes, it can happen. As you noticed, both messages might be processed on
>> different machines. Thus, Kafka Streams provides 'eventual consistency'
>> guarantees.
>>
>>
>> -Matthias
>>
>> On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
>>> Hi John,
>>>
>>> Thanks a lot for you explanation. It does make much more sense now.
>>>
>>> The Jira issue I think is pretty well explained (with a reference to
>>> this thread). And I've lest my 2 cents in the pull request.
>>>
>>> You are right I didn't notice that repartition topic contains the same
>>> message effectively twice, and 0/1 bytes are non-visible, so when I
>>> used kafka-console-consumer I didn't notice that. So I have a quick
>>> suggestion here, wouldn't it make sense to change 0 and 1 bytes to
>>> something that has visible corresponding ascii characters, say + and
>>> -, as these messages are effectively commands to reducer to execute
>>> either an addition or subtraction?
>>>
>>> On a more serious, side, can you please explain temporal aspects of
>>> how change messages are handled? More specifically, is it guaranteed
>>> that both Change(newValue, null) and Change(null, oldValue) are
>>> handled before a new aggregated value is comitted to an output topic?
>>> Change(newValue, null) and Change(null, oldValue) are delivered as two
>>> separate messages via a kafka topic, and when they are read from a
>>> topic (possibly on a different machine where a commit interval is
>>> asynchronous to a machine that's put these changes into a topic) can
>>> it happen so a Change(newValue, null) is processed by a
>>> KTableReduceProcessor, the value of the aggregator is updated, and
>>> committed to the changelog topic, and a Change(null, oldValue) is
>>> processed only in the next commit interval? If I am understand this
>>> correctly that would mean that in an aggregated table an incorrect
>>> aggregated value will be observed briefly, before being eventually
>>> corrected.
>>>
>>> Can that happen? Or I can't see something that would make it impossible?
>>> On Fri, Jul 13, 2018 at 8:05 PM John Roesler <jo...@confluent.io> wrote:
>>>>
>>>> Hi Vasily,
>>>>
>>>> I'm glad you're making me look at this; it's good homework for me!
>>>>
>>>> This is very non-obvious, but here's what happens:
>>>>
>>>> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits
>>>> new/old Change pairs as the value.
>>>>
>>>> Next is the Select (aka GroupBy). In the DSL code, this is the
>>>> KTableRepartitionMap (we call it a repartition when you select a new key,
>>>> since the new keys may belong to different partitions).
>>>> KTableRepartitionMap is a processor that does two things:
>>>> 1. it maps K => K1 (new keys) and V => V1 (new values)
>>>> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
>>>> null)]
>>>> In other words, it turns each Change event into two events: a retraction
>>>> and an update
>>>>
>>>> Next comes the reduce operation. In building the processor node for this
>>>> operation, we create the sink, repartition topic, and source, followed by
>>>> the actual Reduce node. So if you want to look at how the changes get
>>>> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
>>>> You'll see that sink and source a ChangedSerializer and ChangedDeserializer.
>>>>
>>>> By looking into those implementations, I found that they depend on each
>>>> Change containing just one of new OR old. They serialize the underlying
>>>> value using the serde you provide, along with a single byte that signifies
>>>> if the serialized value is the new or old value, which the deserializer
>>>> uses on the receiving end to turn it back into a Change(new, null) or
>>>> Change(null, old) as appropriate. This is why the repartition topic looks
>>>> like it's just the raw data. It basically is, except for the magic byte.
>>>>
>>>> Does that make sense?
>>>>
>>>> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
>>>> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
>>>> leaving any feedback you have?
>>>>
>>>> Thanks,
>>>> -John
>>>>
>>>> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <va...@sulatskov.net>
>>>> wrote:
>>>>
>>>>> Hi John,
>>>>>
>>>>> Thanks for your explanation.
>>>>>
>>>>> I have an answer to the practical question, i.e. a null aggregator
>>>>> value should be interpreted as a fatal application error.
>>>>>
>>>>> On the other hand, looking at the app topology, I see that a message
>>>>> from KSTREAM-REDUCE-0000000002 / "table" goes goes to
>>>>> KTABLE-SELECT-0000000006 which in turn forwards data to
>>>>> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
>>>>> this point I assume that data goes back to kafka into a *-repartition
>>>>> topic, after that the message is read from kafka by
>>>>> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
>>>>> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
>>>>> [aggregated-table]), where the actual aggregation takes place. What I
>>>>> don't get is where this Change value comes from, I mean if it's been
>>>>> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
>>>>> message goes through kafka where it gets serialized, and looking at
>>>>> kafka "repartition" topic, it contains regular values, not a pair of
>>>>> old/new.
>>>>>
>>>>> As far as I understand, Change is a purely in-memory representation of
>>>>> the state for a particular key, and at no point it's serialized back
>>>>> to kafka, yet somehow this Change values makes it to reducer. I feel
>>>>> like I am missing something here. Could you please clarify this?
>>>>>
>>>>> Can you please point me to a place in kafka-streams sources where a
>>>>> Change of newValue/oldValue is produced, so I could take a look? I
>>>>> found KTableReduce implementation, but can't find who makes these
>>>>> Change value.
>>>>> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <jo...@confluent.io> wrote:
>>>>>>
>>>>>> Hi again Vasily,
>>>>>>
>>>>>> Ok, it looks to me like this behavior is the result of the un-clean
>>>>>> topology change.
>>>>>>
>>>>>> Just in case you're interested, here's what I think happened.
>>>>>>
>>>>>> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table"
>>>>> )
>>>>>> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
>>>>>> forwarding both the old and new value that we are able to maintain
>>>>>> aggregates using the subtractor/adder pairs)
>>>>>>
>>>>>> 2. In the full topology, these old/new pairs go through some
>>>>>> transformations, but still in some form eventually make their way down to
>>>>>> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
>>>>>>
>>>>>> 3. The reduce processor logic looks like this:
>>>>>> final V oldAgg = store.get(key);
>>>>>> V newAgg = oldAgg;
>>>>>>
>>>>>> // first try to add the new value
>>>>>> if (value.newValue != null) {
>>>>>>     if (newAgg == null) {
>>>>>>         newAgg = value.newValue;
>>>>>>     } else {
>>>>>>         newAgg = addReducer.apply(newAgg, value.newValue);
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> // then try to remove the old value
>>>>>> if (value.oldValue != null) {
>>>>>>     // Here's where the assumption breaks down...
>>>>>>     newAgg = removeReducer.apply(newAgg, value.oldValue);
>>>>>> }
>>>>>>
>>>>>> 4. Here's what I think happened. This processor saw an event like
>>>>>> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block,
>>>>> and
>>>>>> (since "oldValue != null") would go into the second block. I think that
>>>>> in
>>>>>> the normal case we can rely on the invariant that any value we get as an
>>>>>> "oldValue" is one that we've previously seen ( as "newValue" ).
>>>>>> Consequently, we should be able to assume that if we get a non-null
>>>>>> "oldValue", "newAgg" will also not be null (because we would have written
>>>>>> it to the store back when we saw it as "newValue" and then retrieved it
>>>>>> just now as "newAgg = oldAgg").
>>>>>>
>>>>>> However, if subtopology2, along with KTABLE-SELECT-0000000006
>>>>>> and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
>>>>>> "table") has already emitted some values, then we might in fact receive
>>>>> an
>>>>>> event with some "oldValue" that we have in fact never seen before
>>>>> (because (
>>>>>> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when
>>>>> it
>>>>>> was first emitted as a "newValue").
>>>>>>
>>>>>> This would violate our assumption, and we would unintentionally send a
>>>>>> "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
>>>>>> If you want to double-check my reasoning, you should be able to do so in
>>>>>> the debugger with a breakpoint in KTableReduce.
>>>>>>
>>>>>>
>>>>>> tl;dr: Supposing you reset the app when the topology changes, I think
>>>>> that
>>>>>> you should be able to rely on non-null aggregates being passed in to
>>>>> *both*
>>>>>> the adder and subtractor in a reduce.
>>>>>>
>>>>>> I would be in favor, as you suggested, of adding an explicit check and
>>>>>> throwing an exception if the aggregate is ever null at those points. This
>>>>>> would actually help us detect if the topology has changed unexpectedly
>>>>> and
>>>>>> shut down, hopefully before any damage is done. I'll send a PR and see
>>>>> what
>>>>>> everyone thinks.
>>>>>>
>>>>>> Does this all seem like it adds up to you?
>>>>>> -John
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <va...@sulatskov.net>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi John,
>>>>>>>
>>>>>>> Thanks for your reply. I am not sure if this behavior I've observed is
>>>>>>> a bug or not, as I've not been resetting my application properly. On
>>>>>>> the other hand if the subtractor or adder in the reduce operation are
>>>>>>> never supposed to be called with null aggregator value, perhaps it
>>>>>>> would make sense to put a null check in the table reduce
>>>>>>> implementation to detect an application entering an invalid state. A
>>>>>>> bit like a check for topics having the same number of partitions when
>>>>>>> doing a join.
>>>>>>>
>>>>>>> Here's some information about my tests. Hope that can be useful:
>>>>>>>
>>>>>>> Topology at start:
>>>>>>>
>>>>>>> 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
>>>>>>>    Sub-topology: 0
>>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>>>>>>>       --> KSTREAM-MAP-0000000001
>>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>>>>>>>       --> KSTREAM-FILTER-0000000004
>>>>>>>       <-- KSTREAM-SOURCE-0000000000
>>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000003
>>>>>>>       <-- KSTREAM-MAP-0000000001
>>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>>>>>>>       <-- KSTREAM-FILTER-0000000004
>>>>>>>
>>>>>>>   Sub-topology: 1
>>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>>>>>>>       --> KSTREAM-REDUCE-0000000002
>>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>>>>>>>       --> KTABLE-TOSTREAM-0000000006
>>>>>>>       <-- KSTREAM-SOURCE-0000000005
>>>>>>>     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000007
>>>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
>>>>>>>       <-- KTABLE-TOSTREAM-0000000006
>>>>>>>
>>>>>>> This topology just takes data from the source topic "slope" which
>>>>>>> produces messages like this:
>>>>>>>
>>>>>>> key1
>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>>>> key3
>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>>>> key2
>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>>>> key3
>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>>>> key1
>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>>>> key2
>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>>>>
>>>>>>> Every second, there are 3 new messages arrive from "slope" topic for
>>>>>>> keys key1, key2 and key3, with constantly increasing value.
>>>>>>> Data is transformed so that the original key is also tracked in the
>>>>>>> message value, grouped by key, and windowed with a custom window, and
>>>>>>> reduced with a dummy reduce operation to make a KTable.
>>>>>>> KTable is converted back to a stream, and sent to a topic (just for
>>>>>>> debugging purposes).
>>>>>>>
>>>>>>> Here's the source (it's kafka-streams-scala for the most part). Also
>>>>>>> please ignore silly classes, it's obviously a test:
>>>>>>>
>>>>>>>     val slopeTable = builder
>>>>>>>       .stream[String, TimedValue]("slope")
>>>>>>>       .map(
>>>>>>>         (key, value) =>
>>>>>>>           (
>>>>>>>             StringWrapper(key),
>>>>>>>             TimedValueWithKey(value = value.value, timestamp =
>>>>>>> value.timestamp, key = key)
>>>>>>>         )
>>>>>>>       )
>>>>>>>       .groupByKey
>>>>>>>       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
>>>>>>>       .reduceMat((aggValue, newValue) => newValue, "table")
>>>>>>>
>>>>>>>     slopeTable.toStream
>>>>>>>       .to("slope-table")
>>>>>>>
>>>>>>> Topology after change without a proper reset:
>>>>>>>
>>>>>>> 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
>>>>>>>    Sub-topology: 0
>>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>>>>>>>       --> KSTREAM-MAP-0000000001
>>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>>>>>>>       --> KSTREAM-FILTER-0000000004
>>>>>>>       <-- KSTREAM-SOURCE-0000000000
>>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000003
>>>>>>>       <-- KSTREAM-MAP-0000000001
>>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>>>>>>>       <-- KSTREAM-FILTER-0000000004
>>>>>>>
>>>>>>>   Sub-topology: 1
>>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>>>>>>>       --> KSTREAM-REDUCE-0000000002
>>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>>>>>>>       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
>>>>>>>       <-- KSTREAM-SOURCE-0000000005
>>>>>>>     Processor: KTABLE-SELECT-0000000006 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000007
>>>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>>>     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000013
>>>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
>>>>>>>       <-- KTABLE-SELECT-0000000006
>>>>>>>     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
>>>>>>>       <-- KTABLE-TOSTREAM-0000000012
>>>>>>>
>>>>>>>   Sub-topology: 2
>>>>>>>     Source: KSTREAM-SOURCE-0000000008 (topics:
>>>>>>> [aggregated-table-repartition])
>>>>>>>       --> KTABLE-REDUCE-0000000009
>>>>>>>     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
>>>>>>>       --> KTABLE-TOSTREAM-0000000010
>>>>>>>       <-- KSTREAM-SOURCE-0000000008
>>>>>>>     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000011
>>>>>>>       <-- KTABLE-REDUCE-0000000009
>>>>>>>     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
>>>>>>>       <-- KTABLE-TOSTREAM-0000000010
>>>>>>>
>>>>>>> Here's the source of the sub-topology that does table aggregation:
>>>>>>>
>>>>>>>     slopeTable
>>>>>>>       .groupBy(
>>>>>>>         (key, value) => (new Windowed(StringWrapper("dummykey"),
>>>>>>> key.window()), value)
>>>>>>>       )
>>>>>>>       .reduceMat(adder = (aggValue, newValue) => {
>>>>>>>         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
>>>>>>>         val agg = Option(aggValue)
>>>>>>>         TimedValueWithKey(
>>>>>>>           value = agg.map(_.value).getOrElse(0) + newValue.value,
>>>>>>>           timestamp =
>>>>>>>
>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
>>>>>>> newValue.timestamp),
>>>>>>>           key = "reduced"
>>>>>>>         )
>>>>>>>       }, subtractor = (aggValue, newValue) => {
>>>>>>>         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
>>>>>>>         val agg = Option(aggValue)
>>>>>>>         TimedValueWithKey(
>>>>>>>           value = agg.map(_.value).getOrElse(0) - newValue.value,
>>>>>>>           timestamp =
>>>>>>>
>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
>>>>>>> newValue.timestamp),
>>>>>>>           key = "reduced"
>>>>>>>         )
>>>>>>>       }, "aggregated-table")
>>>>>>>       .toStream
>>>>>>>       .to("slope-aggregated-table")
>>>>>>>
>>>>>>> I log all calls to adder and subtractor, so I am able to see what's
>>>>>>> going on there, as well as I track the original keys of the aggregated
>>>>>>> values and their timestamps, so it's relatively easy to see how the
>>>>>>> data goes through this topology
>>>>>>>
>>>>>>> In order to reproduce this behavior I need to:
>>>>>>> 1. Start a full topology (with table aggregation)
>>>>>>> 2. Start without table aggregation (no app reset)
>>>>>>> 3. Start with table aggregation (no app reset)
>>>>>>>
>>>>>>> Bellow is an interpretation of the adder/subtractor logs for a given
>>>>>>> key/window in the chronological order
>>>>>>>
>>>>>>> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
>>>>>>> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
>>>>>>> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
>>>>>>> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
>>>>>>> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
>>>>>>> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
>>>>>>>
>>>>>>> And in the end the last value that's materialized for that window
>>>>>>> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
>>>>>>> for a single key between some point in the middle of the window and at
>>>>>>> the end of the window, times 3. As opposed to the expected value of
>>>>>>> 751 * 3 = 2253 (sum of last values in a time window for all keys being
>>>>>>> aggregated).
>>>>>>>
>>>>>>> It's clear to me that I should do an application reset, but I also
>>>>>>> would like to understand, should I expect adder/subtractor being
>>>>>>> called with null aggValue, or is it a clear sign that something went
>>>>>>> horribly wrong?
>>>>>>>
>>>>>>> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <jo...@confluent.io>
>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Vasily,
>>>>>>>>
>>>>>>>> Thanks for the email.
>>>>>>>>
>>>>>>>> To answer your question: you should reset the application basically
>>>>> any
>>>>>>>> time you change the topology. Some transitions are safe, but others
>>>>> will
>>>>>>>> result in data loss or corruption. Rather than try to reason about
>>>>> which
>>>>>>> is
>>>>>>>> which, it's much safer just to either reset the app or not change it
>>>>> (if
>>>>>>> it
>>>>>>>> has important state).
>>>>>>>>
>>>>>>>> Beyond changes that you make to the topology, we spend a lot of
>>>>> effort to
>>>>>>>> try and make sure that different versions of Streams will produce the
>>>>>>> same
>>>>>>>> topology, so unless the release notes say otherwise, you should be
>>>>> able
>>>>>>> to
>>>>>>>> upgrade without a reset.
>>>>>>>>
>>>>>>>>
>>>>>>>> I can't say right now whether those wacky behaviors are bugs or the
>>>>>>> result
>>>>>>>> of changing the topology without a reset. Or if they are correct but
>>>>>>>> surprising behavior somehow. I'll look into it tomorrow. Do feel
>>>>> free to
>>>>>>>> open a Jira ticket if you think you have found a bug, especially if
>>>>> you
>>>>>>> can
>>>>>>>> describe a repro. Knowing your topology before and after the change
>>>>> would
>>>>>>>> also be immensely helpful. You can print it with Topology.describe().
>>>>>>>>
>>>>>>>> Regardless, I'll make a note to take a look at the code tomorrow and
>>>>> try
>>>>>>> to
>>>>>>>> decide if you should expect these behaviors with "clean" topology
>>>>>>> changes.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <
>>>>> vasily@sulatskov.net>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am doing some experiments with kafka-streams KGroupedTable
>>>>>>>>> aggregation, and admittedly I am not wiping data properly on each
>>>>>>>>> restart, partially because I also wonder what would happen if you
>>>>>>>>> change a streams topology without doing a proper reset.
>>>>>>>>>
>>>>>>>>> I've noticed that from time to time, kafka-streams
>>>>>>>>> KGroupedTable.reduce() can call subtractor function with null
>>>>>>>>> aggregator value, and if you try to work around that, by
>>>>> interpreting
>>>>>>>>> null aggregator value as zero for numeric value you get incorrect
>>>>>>>>> aggregation result.
>>>>>>>>>
>>>>>>>>> I do understand that the proper way of handling this is to do a
>>>>> reset
>>>>>>>>> on topology changes, but I'd like to understand if there's any
>>>>>>>>> legitmate case when kafka-streams can call an adder or a
>>>>> substractor
>>>>>>>>> with null aggregator value, and should I plan for this, or should I
>>>>>>>>> interpret this as an invalid state, and terminate the application,
>>>>> and
>>>>>>>>> do a proper reset?
>>>>>>>>>
>>>>>>>>> Also, I can't seem to find a guide which explains when application
>>>>>>>>> reset is necessary. Intuitively it seems that it should be done
>>>>> every
>>>>>>>>> time a topology changes. Any other cases?
>>>>>>>>>
>>>>>>>>> I tried to debug where the null value comes from and it seems that
>>>>>>>>> KTableReduce.process() is getting called with Change<V> value with
>>>>>>>>> newValue == null, and some non-null oldValue. Which leads to and to
>>>>>>>>> subtractor being called with null aggregator value. I wonder how
>>>>> it is
>>>>>>>>> possible to have an old value for a key without a new value (does
>>>>> it
>>>>>>>>> happen because of the auto commit interval?).
>>>>>>>>>
>>>>>>>>> I've also noticed that it's possible for an input value from a
>>>>> topic
>>>>>>>>> to bypass aggregation function entirely and be directly
>>>>> transmitted to
>>>>>>>>> the output in certain cases: oldAgg is null, newValue is not null
>>>>> and
>>>>>>>>> oldValue is null - in that case newValue will be transmitted
>>>>> directly
>>>>>>>>> to the output. I suppose it's the correct behaviour, but feels a
>>>>> bit
>>>>>>>>> weird nonetheless. And I've actually been able to observe this
>>>>>>>>> behaviour in practice. I suppose it's also caused by this happening
>>>>>>>>> right before a commit happens, and the message is sent to a
>>>>> changelog
>>>>>>>>> topic.
>>>>>>>>>
>>>>>>>>> Please can someone with more knowledge shed some light on these
>>>>> issues?
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards,
>>>>>>>>> Vasily Sulatskov
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Vasily Sulatskov
>>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Vasily Sulatskov
>>>>>
>>>
>>>
>>>
>>
> 
> 


Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by Vasily Sulatskov <va...@sulatskov.net>.
Hi,

It seems that it wouldn't be that difficult to address: just don't
break Change(newVal, oldVal) into Change(newVal, null) /
Change(oldVal, null) and update aggregator value in one .process()
call.

Would this change make sense?
On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax <ma...@confluent.io> wrote:
>
> Vasily,
>
> yes, it can happen. As you noticed, both messages might be processed on
> different machines. Thus, Kafka Streams provides 'eventual consistency'
> guarantees.
>
>
> -Matthias
>
> On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
> > Hi John,
> >
> > Thanks a lot for you explanation. It does make much more sense now.
> >
> > The Jira issue I think is pretty well explained (with a reference to
> > this thread). And I've lest my 2 cents in the pull request.
> >
> > You are right I didn't notice that repartition topic contains the same
> > message effectively twice, and 0/1 bytes are non-visible, so when I
> > used kafka-console-consumer I didn't notice that. So I have a quick
> > suggestion here, wouldn't it make sense to change 0 and 1 bytes to
> > something that has visible corresponding ascii characters, say + and
> > -, as these messages are effectively commands to reducer to execute
> > either an addition or subtraction?
> >
> > On a more serious, side, can you please explain temporal aspects of
> > how change messages are handled? More specifically, is it guaranteed
> > that both Change(newValue, null) and Change(null, oldValue) are
> > handled before a new aggregated value is comitted to an output topic?
> > Change(newValue, null) and Change(null, oldValue) are delivered as two
> > separate messages via a kafka topic, and when they are read from a
> > topic (possibly on a different machine where a commit interval is
> > asynchronous to a machine that's put these changes into a topic) can
> > it happen so a Change(newValue, null) is processed by a
> > KTableReduceProcessor, the value of the aggregator is updated, and
> > committed to the changelog topic, and a Change(null, oldValue) is
> > processed only in the next commit interval? If I am understand this
> > correctly that would mean that in an aggregated table an incorrect
> > aggregated value will be observed briefly, before being eventually
> > corrected.
> >
> > Can that happen? Or I can't see something that would make it impossible?
> > On Fri, Jul 13, 2018 at 8:05 PM John Roesler <jo...@confluent.io> wrote:
> >>
> >> Hi Vasily,
> >>
> >> I'm glad you're making me look at this; it's good homework for me!
> >>
> >> This is very non-obvious, but here's what happens:
> >>
> >> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits
> >> new/old Change pairs as the value.
> >>
> >> Next is the Select (aka GroupBy). In the DSL code, this is the
> >> KTableRepartitionMap (we call it a repartition when you select a new key,
> >> since the new keys may belong to different partitions).
> >> KTableRepartitionMap is a processor that does two things:
> >> 1. it maps K => K1 (new keys) and V => V1 (new values)
> >> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
> >> null)]
> >> In other words, it turns each Change event into two events: a retraction
> >> and an update
> >>
> >> Next comes the reduce operation. In building the processor node for this
> >> operation, we create the sink, repartition topic, and source, followed by
> >> the actual Reduce node. So if you want to look at how the changes get
> >> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
> >> You'll see that sink and source a ChangedSerializer and ChangedDeserializer.
> >>
> >> By looking into those implementations, I found that they depend on each
> >> Change containing just one of new OR old. They serialize the underlying
> >> value using the serde you provide, along with a single byte that signifies
> >> if the serialized value is the new or old value, which the deserializer
> >> uses on the receiving end to turn it back into a Change(new, null) or
> >> Change(null, old) as appropriate. This is why the repartition topic looks
> >> like it's just the raw data. It basically is, except for the magic byte.
> >>
> >> Does that make sense?
> >>
> >> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
> >> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
> >> leaving any feedback you have?
> >>
> >> Thanks,
> >> -John
> >>
> >> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <va...@sulatskov.net>
> >> wrote:
> >>
> >>> Hi John,
> >>>
> >>> Thanks for your explanation.
> >>>
> >>> I have an answer to the practical question, i.e. a null aggregator
> >>> value should be interpreted as a fatal application error.
> >>>
> >>> On the other hand, looking at the app topology, I see that a message
> >>> from KSTREAM-REDUCE-0000000002 / "table" goes goes to
> >>> KTABLE-SELECT-0000000006 which in turn forwards data to
> >>> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
> >>> this point I assume that data goes back to kafka into a *-repartition
> >>> topic, after that the message is read from kafka by
> >>> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
> >>> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
> >>> [aggregated-table]), where the actual aggregation takes place. What I
> >>> don't get is where this Change value comes from, I mean if it's been
> >>> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
> >>> message goes through kafka where it gets serialized, and looking at
> >>> kafka "repartition" topic, it contains regular values, not a pair of
> >>> old/new.
> >>>
> >>> As far as I understand, Change is a purely in-memory representation of
> >>> the state for a particular key, and at no point it's serialized back
> >>> to kafka, yet somehow this Change values makes it to reducer. I feel
> >>> like I am missing something here. Could you please clarify this?
> >>>
> >>> Can you please point me to a place in kafka-streams sources where a
> >>> Change of newValue/oldValue is produced, so I could take a look? I
> >>> found KTableReduce implementation, but can't find who makes these
> >>> Change value.
> >>> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <jo...@confluent.io> wrote:
> >>>>
> >>>> Hi again Vasily,
> >>>>
> >>>> Ok, it looks to me like this behavior is the result of the un-clean
> >>>> topology change.
> >>>>
> >>>> Just in case you're interested, here's what I think happened.
> >>>>
> >>>> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table"
> >>> )
> >>>> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
> >>>> forwarding both the old and new value that we are able to maintain
> >>>> aggregates using the subtractor/adder pairs)
> >>>>
> >>>> 2. In the full topology, these old/new pairs go through some
> >>>> transformations, but still in some form eventually make their way down to
> >>>> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
> >>>>
> >>>> 3. The reduce processor logic looks like this:
> >>>> final V oldAgg = store.get(key);
> >>>> V newAgg = oldAgg;
> >>>>
> >>>> // first try to add the new value
> >>>> if (value.newValue != null) {
> >>>>     if (newAgg == null) {
> >>>>         newAgg = value.newValue;
> >>>>     } else {
> >>>>         newAgg = addReducer.apply(newAgg, value.newValue);
> >>>>     }
> >>>> }
> >>>>
> >>>> // then try to remove the old value
> >>>> if (value.oldValue != null) {
> >>>>     // Here's where the assumption breaks down...
> >>>>     newAgg = removeReducer.apply(newAgg, value.oldValue);
> >>>> }
> >>>>
> >>>> 4. Here's what I think happened. This processor saw an event like
> >>>> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block,
> >>> and
> >>>> (since "oldValue != null") would go into the second block. I think that
> >>> in
> >>>> the normal case we can rely on the invariant that any value we get as an
> >>>> "oldValue" is one that we've previously seen ( as "newValue" ).
> >>>> Consequently, we should be able to assume that if we get a non-null
> >>>> "oldValue", "newAgg" will also not be null (because we would have written
> >>>> it to the store back when we saw it as "newValue" and then retrieved it
> >>>> just now as "newAgg = oldAgg").
> >>>>
> >>>> However, if subtopology2, along with KTABLE-SELECT-0000000006
> >>>> and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
> >>>> "table") has already emitted some values, then we might in fact receive
> >>> an
> >>>> event with some "oldValue" that we have in fact never seen before
> >>> (because (
> >>>> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when
> >>> it
> >>>> was first emitted as a "newValue").
> >>>>
> >>>> This would violate our assumption, and we would unintentionally send a
> >>>> "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
> >>>> If you want to double-check my reasoning, you should be able to do so in
> >>>> the debugger with a breakpoint in KTableReduce.
> >>>>
> >>>>
> >>>> tl;dr: Supposing you reset the app when the topology changes, I think
> >>> that
> >>>> you should be able to rely on non-null aggregates being passed in to
> >>> *both*
> >>>> the adder and subtractor in a reduce.
> >>>>
> >>>> I would be in favor, as you suggested, of adding an explicit check and
> >>>> throwing an exception if the aggregate is ever null at those points. This
> >>>> would actually help us detect if the topology has changed unexpectedly
> >>> and
> >>>> shut down, hopefully before any damage is done. I'll send a PR and see
> >>> what
> >>>> everyone thinks.
> >>>>
> >>>> Does this all seem like it adds up to you?
> >>>> -John
> >>>>
> >>>>
> >>>> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <va...@sulatskov.net>
> >>>> wrote:
> >>>>
> >>>>> Hi John,
> >>>>>
> >>>>> Thanks for your reply. I am not sure if this behavior I've observed is
> >>>>> a bug or not, as I've not been resetting my application properly. On
> >>>>> the other hand if the subtractor or adder in the reduce operation are
> >>>>> never supposed to be called with null aggregator value, perhaps it
> >>>>> would make sense to put a null check in the table reduce
> >>>>> implementation to detect an application entering an invalid state. A
> >>>>> bit like a check for topics having the same number of partitions when
> >>>>> doing a join.
> >>>>>
> >>>>> Here's some information about my tests. Hope that can be useful:
> >>>>>
> >>>>> Topology at start:
> >>>>>
> >>>>> 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
> >>>>>    Sub-topology: 0
> >>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >>>>>       --> KSTREAM-MAP-0000000001
> >>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >>>>>       --> KSTREAM-FILTER-0000000004
> >>>>>       <-- KSTREAM-SOURCE-0000000000
> >>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000003
> >>>>>       <-- KSTREAM-MAP-0000000001
> >>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >>>>>       <-- KSTREAM-FILTER-0000000004
> >>>>>
> >>>>>   Sub-topology: 1
> >>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >>>>>       --> KSTREAM-REDUCE-0000000002
> >>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >>>>>       --> KTABLE-TOSTREAM-0000000006
> >>>>>       <-- KSTREAM-SOURCE-0000000005
> >>>>>     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000007
> >>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
> >>>>>       <-- KTABLE-TOSTREAM-0000000006
> >>>>>
> >>>>> This topology just takes data from the source topic "slope" which
> >>>>> produces messages like this:
> >>>>>
> >>>>> key1
> >>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>> key3
> >>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>> key2
> >>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>> key3
> >>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>> key1
> >>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>> key2
> >>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>>
> >>>>> Every second, there are 3 new messages arrive from "slope" topic for
> >>>>> keys key1, key2 and key3, with constantly increasing value.
> >>>>> Data is transformed so that the original key is also tracked in the
> >>>>> message value, grouped by key, and windowed with a custom window, and
> >>>>> reduced with a dummy reduce operation to make a KTable.
> >>>>> KTable is converted back to a stream, and sent to a topic (just for
> >>>>> debugging purposes).
> >>>>>
> >>>>> Here's the source (it's kafka-streams-scala for the most part). Also
> >>>>> please ignore silly classes, it's obviously a test:
> >>>>>
> >>>>>     val slopeTable = builder
> >>>>>       .stream[String, TimedValue]("slope")
> >>>>>       .map(
> >>>>>         (key, value) =>
> >>>>>           (
> >>>>>             StringWrapper(key),
> >>>>>             TimedValueWithKey(value = value.value, timestamp =
> >>>>> value.timestamp, key = key)
> >>>>>         )
> >>>>>       )
> >>>>>       .groupByKey
> >>>>>       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
> >>>>>       .reduceMat((aggValue, newValue) => newValue, "table")
> >>>>>
> >>>>>     slopeTable.toStream
> >>>>>       .to("slope-table")
> >>>>>
> >>>>> Topology after change without a proper reset:
> >>>>>
> >>>>> 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
> >>>>>    Sub-topology: 0
> >>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >>>>>       --> KSTREAM-MAP-0000000001
> >>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >>>>>       --> KSTREAM-FILTER-0000000004
> >>>>>       <-- KSTREAM-SOURCE-0000000000
> >>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000003
> >>>>>       <-- KSTREAM-MAP-0000000001
> >>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >>>>>       <-- KSTREAM-FILTER-0000000004
> >>>>>
> >>>>>   Sub-topology: 1
> >>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >>>>>       --> KSTREAM-REDUCE-0000000002
> >>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >>>>>       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
> >>>>>       <-- KSTREAM-SOURCE-0000000005
> >>>>>     Processor: KTABLE-SELECT-0000000006 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000007
> >>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000013
> >>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
> >>>>>       <-- KTABLE-SELECT-0000000006
> >>>>>     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
> >>>>>       <-- KTABLE-TOSTREAM-0000000012
> >>>>>
> >>>>>   Sub-topology: 2
> >>>>>     Source: KSTREAM-SOURCE-0000000008 (topics:
> >>>>> [aggregated-table-repartition])
> >>>>>       --> KTABLE-REDUCE-0000000009
> >>>>>     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
> >>>>>       --> KTABLE-TOSTREAM-0000000010
> >>>>>       <-- KSTREAM-SOURCE-0000000008
> >>>>>     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000011
> >>>>>       <-- KTABLE-REDUCE-0000000009
> >>>>>     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
> >>>>>       <-- KTABLE-TOSTREAM-0000000010
> >>>>>
> >>>>> Here's the source of the sub-topology that does table aggregation:
> >>>>>
> >>>>>     slopeTable
> >>>>>       .groupBy(
> >>>>>         (key, value) => (new Windowed(StringWrapper("dummykey"),
> >>>>> key.window()), value)
> >>>>>       )
> >>>>>       .reduceMat(adder = (aggValue, newValue) => {
> >>>>>         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
> >>>>>         val agg = Option(aggValue)
> >>>>>         TimedValueWithKey(
> >>>>>           value = agg.map(_.value).getOrElse(0) + newValue.value,
> >>>>>           timestamp =
> >>>>>
> >>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> >>>>> newValue.timestamp),
> >>>>>           key = "reduced"
> >>>>>         )
> >>>>>       }, subtractor = (aggValue, newValue) => {
> >>>>>         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
> >>>>>         val agg = Option(aggValue)
> >>>>>         TimedValueWithKey(
> >>>>>           value = agg.map(_.value).getOrElse(0) - newValue.value,
> >>>>>           timestamp =
> >>>>>
> >>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> >>>>> newValue.timestamp),
> >>>>>           key = "reduced"
> >>>>>         )
> >>>>>       }, "aggregated-table")
> >>>>>       .toStream
> >>>>>       .to("slope-aggregated-table")
> >>>>>
> >>>>> I log all calls to adder and subtractor, so I am able to see what's
> >>>>> going on there, as well as I track the original keys of the aggregated
> >>>>> values and their timestamps, so it's relatively easy to see how the
> >>>>> data goes through this topology
> >>>>>
> >>>>> In order to reproduce this behavior I need to:
> >>>>> 1. Start a full topology (with table aggregation)
> >>>>> 2. Start without table aggregation (no app reset)
> >>>>> 3. Start with table aggregation (no app reset)
> >>>>>
> >>>>> Bellow is an interpretation of the adder/subtractor logs for a given
> >>>>> key/window in the chronological order
> >>>>>
> >>>>> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> >>>>> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> >>>>> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
> >>>>> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
> >>>>> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
> >>>>> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
> >>>>>
> >>>>> And in the end the last value that's materialized for that window
> >>>>> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
> >>>>> for a single key between some point in the middle of the window and at
> >>>>> the end of the window, times 3. As opposed to the expected value of
> >>>>> 751 * 3 = 2253 (sum of last values in a time window for all keys being
> >>>>> aggregated).
> >>>>>
> >>>>> It's clear to me that I should do an application reset, but I also
> >>>>> would like to understand, should I expect adder/subtractor being
> >>>>> called with null aggValue, or is it a clear sign that something went
> >>>>> horribly wrong?
> >>>>>
> >>>>> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <jo...@confluent.io>
> >>> wrote:
> >>>>>>
> >>>>>> Hi Vasily,
> >>>>>>
> >>>>>> Thanks for the email.
> >>>>>>
> >>>>>> To answer your question: you should reset the application basically
> >>> any
> >>>>>> time you change the topology. Some transitions are safe, but others
> >>> will
> >>>>>> result in data loss or corruption. Rather than try to reason about
> >>> which
> >>>>> is
> >>>>>> which, it's much safer just to either reset the app or not change it
> >>> (if
> >>>>> it
> >>>>>> has important state).
> >>>>>>
> >>>>>> Beyond changes that you make to the topology, we spend a lot of
> >>> effort to
> >>>>>> try and make sure that different versions of Streams will produce the
> >>>>> same
> >>>>>> topology, so unless the release notes say otherwise, you should be
> >>> able
> >>>>> to
> >>>>>> upgrade without a reset.
> >>>>>>
> >>>>>>
> >>>>>> I can't say right now whether those wacky behaviors are bugs or the
> >>>>> result
> >>>>>> of changing the topology without a reset. Or if they are correct but
> >>>>>> surprising behavior somehow. I'll look into it tomorrow. Do feel
> >>> free to
> >>>>>> open a Jira ticket if you think you have found a bug, especially if
> >>> you
> >>>>> can
> >>>>>> describe a repro. Knowing your topology before and after the change
> >>> would
> >>>>>> also be immensely helpful. You can print it with Topology.describe().
> >>>>>>
> >>>>>> Regardless, I'll make a note to take a look at the code tomorrow and
> >>> try
> >>>>> to
> >>>>>> decide if you should expect these behaviors with "clean" topology
> >>>>> changes.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> -John
> >>>>>>
> >>>>>> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <
> >>> vasily@sulatskov.net>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I am doing some experiments with kafka-streams KGroupedTable
> >>>>>>> aggregation, and admittedly I am not wiping data properly on each
> >>>>>>> restart, partially because I also wonder what would happen if you
> >>>>>>> change a streams topology without doing a proper reset.
> >>>>>>>
> >>>>>>> I've noticed that from time to time, kafka-streams
> >>>>>>> KGroupedTable.reduce() can call subtractor function with null
> >>>>>>> aggregator value, and if you try to work around that, by
> >>> interpreting
> >>>>>>> null aggregator value as zero for numeric value you get incorrect
> >>>>>>> aggregation result.
> >>>>>>>
> >>>>>>> I do understand that the proper way of handling this is to do a
> >>> reset
> >>>>>>> on topology changes, but I'd like to understand if there's any
> >>>>>>> legitmate case when kafka-streams can call an adder or a
> >>> substractor
> >>>>>>> with null aggregator value, and should I plan for this, or should I
> >>>>>>> interpret this as an invalid state, and terminate the application,
> >>> and
> >>>>>>> do a proper reset?
> >>>>>>>
> >>>>>>> Also, I can't seem to find a guide which explains when application
> >>>>>>> reset is necessary. Intuitively it seems that it should be done
> >>> every
> >>>>>>> time a topology changes. Any other cases?
> >>>>>>>
> >>>>>>> I tried to debug where the null value comes from and it seems that
> >>>>>>> KTableReduce.process() is getting called with Change<V> value with
> >>>>>>> newValue == null, and some non-null oldValue. Which leads to and to
> >>>>>>> subtractor being called with null aggregator value. I wonder how
> >>> it is
> >>>>>>> possible to have an old value for a key without a new value (does
> >>> it
> >>>>>>> happen because of the auto commit interval?).
> >>>>>>>
> >>>>>>> I've also noticed that it's possible for an input value from a
> >>> topic
> >>>>>>> to bypass aggregation function entirely and be directly
> >>> transmitted to
> >>>>>>> the output in certain cases: oldAgg is null, newValue is not null
> >>> and
> >>>>>>> oldValue is null - in that case newValue will be transmitted
> >>> directly
> >>>>>>> to the output. I suppose it's the correct behaviour, but feels a
> >>> bit
> >>>>>>> weird nonetheless. And I've actually been able to observe this
> >>>>>>> behaviour in practice. I suppose it's also caused by this happening
> >>>>>>> right before a commit happens, and the message is sent to a
> >>> changelog
> >>>>>>> topic.
> >>>>>>>
> >>>>>>> Please can someone with more knowledge shed some light on these
> >>> issues?
> >>>>>>>
> >>>>>>> --
> >>>>>>> Best regards,
> >>>>>>> Vasily Sulatskov
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Best regards,
> >>>>> Vasily Sulatskov
> >>>>>
> >>>
> >>>
> >>>
> >>> --
> >>> Best regards,
> >>> Vasily Sulatskov
> >>>
> >
> >
> >
>


-- 
Best regards,
Vasily Sulatskov

Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Vasily,

yes, it can happen. As you noticed, both messages might be processed on
different machines. Thus, Kafka Streams provides 'eventual consistency'
guarantees.


-Matthias

On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
> Hi John,
> 
> Thanks a lot for you explanation. It does make much more sense now.
> 
> The Jira issue I think is pretty well explained (with a reference to
> this thread). And I've lest my 2 cents in the pull request.
> 
> You are right I didn't notice that repartition topic contains the same
> message effectively twice, and 0/1 bytes are non-visible, so when I
> used kafka-console-consumer I didn't notice that. So I have a quick
> suggestion here, wouldn't it make sense to change 0 and 1 bytes to
> something that has visible corresponding ascii characters, say + and
> -, as these messages are effectively commands to reducer to execute
> either an addition or subtraction?
> 
> On a more serious, side, can you please explain temporal aspects of
> how change messages are handled? More specifically, is it guaranteed
> that both Change(newValue, null) and Change(null, oldValue) are
> handled before a new aggregated value is comitted to an output topic?
> Change(newValue, null) and Change(null, oldValue) are delivered as two
> separate messages via a kafka topic, and when they are read from a
> topic (possibly on a different machine where a commit interval is
> asynchronous to a machine that's put these changes into a topic) can
> it happen so a Change(newValue, null) is processed by a
> KTableReduceProcessor, the value of the aggregator is updated, and
> committed to the changelog topic, and a Change(null, oldValue) is
> processed only in the next commit interval? If I am understand this
> correctly that would mean that in an aggregated table an incorrect
> aggregated value will be observed briefly, before being eventually
> corrected.
> 
> Can that happen? Or I can't see something that would make it impossible?
> On Fri, Jul 13, 2018 at 8:05 PM John Roesler <jo...@confluent.io> wrote:
>>
>> Hi Vasily,
>>
>> I'm glad you're making me look at this; it's good homework for me!
>>
>> This is very non-obvious, but here's what happens:
>>
>> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits
>> new/old Change pairs as the value.
>>
>> Next is the Select (aka GroupBy). In the DSL code, this is the
>> KTableRepartitionMap (we call it a repartition when you select a new key,
>> since the new keys may belong to different partitions).
>> KTableRepartitionMap is a processor that does two things:
>> 1. it maps K => K1 (new keys) and V => V1 (new values)
>> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
>> null)]
>> In other words, it turns each Change event into two events: a retraction
>> and an update
>>
>> Next comes the reduce operation. In building the processor node for this
>> operation, we create the sink, repartition topic, and source, followed by
>> the actual Reduce node. So if you want to look at how the changes get
>> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
>> You'll see that sink and source a ChangedSerializer and ChangedDeserializer.
>>
>> By looking into those implementations, I found that they depend on each
>> Change containing just one of new OR old. They serialize the underlying
>> value using the serde you provide, along with a single byte that signifies
>> if the serialized value is the new or old value, which the deserializer
>> uses on the receiving end to turn it back into a Change(new, null) or
>> Change(null, old) as appropriate. This is why the repartition topic looks
>> like it's just the raw data. It basically is, except for the magic byte.
>>
>> Does that make sense?
>>
>> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
>> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
>> leaving any feedback you have?
>>
>> Thanks,
>> -John
>>
>> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <va...@sulatskov.net>
>> wrote:
>>
>>> Hi John,
>>>
>>> Thanks for your explanation.
>>>
>>> I have an answer to the practical question, i.e. a null aggregator
>>> value should be interpreted as a fatal application error.
>>>
>>> On the other hand, looking at the app topology, I see that a message
>>> from KSTREAM-REDUCE-0000000002 / "table" goes goes to
>>> KTABLE-SELECT-0000000006 which in turn forwards data to
>>> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
>>> this point I assume that data goes back to kafka into a *-repartition
>>> topic, after that the message is read from kafka by
>>> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
>>> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
>>> [aggregated-table]), where the actual aggregation takes place. What I
>>> don't get is where this Change value comes from, I mean if it's been
>>> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
>>> message goes through kafka where it gets serialized, and looking at
>>> kafka "repartition" topic, it contains regular values, not a pair of
>>> old/new.
>>>
>>> As far as I understand, Change is a purely in-memory representation of
>>> the state for a particular key, and at no point it's serialized back
>>> to kafka, yet somehow this Change values makes it to reducer. I feel
>>> like I am missing something here. Could you please clarify this?
>>>
>>> Can you please point me to a place in kafka-streams sources where a
>>> Change of newValue/oldValue is produced, so I could take a look? I
>>> found KTableReduce implementation, but can't find who makes these
>>> Change value.
>>> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <jo...@confluent.io> wrote:
>>>>
>>>> Hi again Vasily,
>>>>
>>>> Ok, it looks to me like this behavior is the result of the un-clean
>>>> topology change.
>>>>
>>>> Just in case you're interested, here's what I think happened.
>>>>
>>>> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table"
>>> )
>>>> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
>>>> forwarding both the old and new value that we are able to maintain
>>>> aggregates using the subtractor/adder pairs)
>>>>
>>>> 2. In the full topology, these old/new pairs go through some
>>>> transformations, but still in some form eventually make their way down to
>>>> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
>>>>
>>>> 3. The reduce processor logic looks like this:
>>>> final V oldAgg = store.get(key);
>>>> V newAgg = oldAgg;
>>>>
>>>> // first try to add the new value
>>>> if (value.newValue != null) {
>>>>     if (newAgg == null) {
>>>>         newAgg = value.newValue;
>>>>     } else {
>>>>         newAgg = addReducer.apply(newAgg, value.newValue);
>>>>     }
>>>> }
>>>>
>>>> // then try to remove the old value
>>>> if (value.oldValue != null) {
>>>>     // Here's where the assumption breaks down...
>>>>     newAgg = removeReducer.apply(newAgg, value.oldValue);
>>>> }
>>>>
>>>> 4. Here's what I think happened. This processor saw an event like
>>>> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block,
>>> and
>>>> (since "oldValue != null") would go into the second block. I think that
>>> in
>>>> the normal case we can rely on the invariant that any value we get as an
>>>> "oldValue" is one that we've previously seen ( as "newValue" ).
>>>> Consequently, we should be able to assume that if we get a non-null
>>>> "oldValue", "newAgg" will also not be null (because we would have written
>>>> it to the store back when we saw it as "newValue" and then retrieved it
>>>> just now as "newAgg = oldAgg").
>>>>
>>>> However, if subtopology2, along with KTABLE-SELECT-0000000006
>>>> and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
>>>> "table") has already emitted some values, then we might in fact receive
>>> an
>>>> event with some "oldValue" that we have in fact never seen before
>>> (because (
>>>> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when
>>> it
>>>> was first emitted as a "newValue").
>>>>
>>>> This would violate our assumption, and we would unintentionally send a
>>>> "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
>>>> If you want to double-check my reasoning, you should be able to do so in
>>>> the debugger with a breakpoint in KTableReduce.
>>>>
>>>>
>>>> tl;dr: Supposing you reset the app when the topology changes, I think
>>> that
>>>> you should be able to rely on non-null aggregates being passed in to
>>> *both*
>>>> the adder and subtractor in a reduce.
>>>>
>>>> I would be in favor, as you suggested, of adding an explicit check and
>>>> throwing an exception if the aggregate is ever null at those points. This
>>>> would actually help us detect if the topology has changed unexpectedly
>>> and
>>>> shut down, hopefully before any damage is done. I'll send a PR and see
>>> what
>>>> everyone thinks.
>>>>
>>>> Does this all seem like it adds up to you?
>>>> -John
>>>>
>>>>
>>>> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <va...@sulatskov.net>
>>>> wrote:
>>>>
>>>>> Hi John,
>>>>>
>>>>> Thanks for your reply. I am not sure if this behavior I've observed is
>>>>> a bug or not, as I've not been resetting my application properly. On
>>>>> the other hand if the subtractor or adder in the reduce operation are
>>>>> never supposed to be called with null aggregator value, perhaps it
>>>>> would make sense to put a null check in the table reduce
>>>>> implementation to detect an application entering an invalid state. A
>>>>> bit like a check for topics having the same number of partitions when
>>>>> doing a join.
>>>>>
>>>>> Here's some information about my tests. Hope that can be useful:
>>>>>
>>>>> Topology at start:
>>>>>
>>>>> 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
>>>>>    Sub-topology: 0
>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>>>>>       --> KSTREAM-MAP-0000000001
>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>>>>>       --> KSTREAM-FILTER-0000000004
>>>>>       <-- KSTREAM-SOURCE-0000000000
>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>>>>>       --> KSTREAM-SINK-0000000003
>>>>>       <-- KSTREAM-MAP-0000000001
>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>>>>>       <-- KSTREAM-FILTER-0000000004
>>>>>
>>>>>   Sub-topology: 1
>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>>>>>       --> KSTREAM-REDUCE-0000000002
>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>>>>>       --> KTABLE-TOSTREAM-0000000006
>>>>>       <-- KSTREAM-SOURCE-0000000005
>>>>>     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
>>>>>       --> KSTREAM-SINK-0000000007
>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
>>>>>       <-- KTABLE-TOSTREAM-0000000006
>>>>>
>>>>> This topology just takes data from the source topic "slope" which
>>>>> produces messages like this:
>>>>>
>>>>> key1
>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>> key3
>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>> key2
>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>> key3
>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>> key1
>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>> key2
>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>>
>>>>> Every second, there are 3 new messages arrive from "slope" topic for
>>>>> keys key1, key2 and key3, with constantly increasing value.
>>>>> Data is transformed so that the original key is also tracked in the
>>>>> message value, grouped by key, and windowed with a custom window, and
>>>>> reduced with a dummy reduce operation to make a KTable.
>>>>> KTable is converted back to a stream, and sent to a topic (just for
>>>>> debugging purposes).
>>>>>
>>>>> Here's the source (it's kafka-streams-scala for the most part). Also
>>>>> please ignore silly classes, it's obviously a test:
>>>>>
>>>>>     val slopeTable = builder
>>>>>       .stream[String, TimedValue]("slope")
>>>>>       .map(
>>>>>         (key, value) =>
>>>>>           (
>>>>>             StringWrapper(key),
>>>>>             TimedValueWithKey(value = value.value, timestamp =
>>>>> value.timestamp, key = key)
>>>>>         )
>>>>>       )
>>>>>       .groupByKey
>>>>>       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
>>>>>       .reduceMat((aggValue, newValue) => newValue, "table")
>>>>>
>>>>>     slopeTable.toStream
>>>>>       .to("slope-table")
>>>>>
>>>>> Topology after change without a proper reset:
>>>>>
>>>>> 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
>>>>>    Sub-topology: 0
>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>>>>>       --> KSTREAM-MAP-0000000001
>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>>>>>       --> KSTREAM-FILTER-0000000004
>>>>>       <-- KSTREAM-SOURCE-0000000000
>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>>>>>       --> KSTREAM-SINK-0000000003
>>>>>       <-- KSTREAM-MAP-0000000001
>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>>>>>       <-- KSTREAM-FILTER-0000000004
>>>>>
>>>>>   Sub-topology: 1
>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>>>>>       --> KSTREAM-REDUCE-0000000002
>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>>>>>       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
>>>>>       <-- KSTREAM-SOURCE-0000000005
>>>>>     Processor: KTABLE-SELECT-0000000006 (stores: [])
>>>>>       --> KSTREAM-SINK-0000000007
>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
>>>>>       --> KSTREAM-SINK-0000000013
>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
>>>>>       <-- KTABLE-SELECT-0000000006
>>>>>     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
>>>>>       <-- KTABLE-TOSTREAM-0000000012
>>>>>
>>>>>   Sub-topology: 2
>>>>>     Source: KSTREAM-SOURCE-0000000008 (topics:
>>>>> [aggregated-table-repartition])
>>>>>       --> KTABLE-REDUCE-0000000009
>>>>>     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
>>>>>       --> KTABLE-TOSTREAM-0000000010
>>>>>       <-- KSTREAM-SOURCE-0000000008
>>>>>     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
>>>>>       --> KSTREAM-SINK-0000000011
>>>>>       <-- KTABLE-REDUCE-0000000009
>>>>>     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
>>>>>       <-- KTABLE-TOSTREAM-0000000010
>>>>>
>>>>> Here's the source of the sub-topology that does table aggregation:
>>>>>
>>>>>     slopeTable
>>>>>       .groupBy(
>>>>>         (key, value) => (new Windowed(StringWrapper("dummykey"),
>>>>> key.window()), value)
>>>>>       )
>>>>>       .reduceMat(adder = (aggValue, newValue) => {
>>>>>         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
>>>>>         val agg = Option(aggValue)
>>>>>         TimedValueWithKey(
>>>>>           value = agg.map(_.value).getOrElse(0) + newValue.value,
>>>>>           timestamp =
>>>>>
>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
>>>>> newValue.timestamp),
>>>>>           key = "reduced"
>>>>>         )
>>>>>       }, subtractor = (aggValue, newValue) => {
>>>>>         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
>>>>>         val agg = Option(aggValue)
>>>>>         TimedValueWithKey(
>>>>>           value = agg.map(_.value).getOrElse(0) - newValue.value,
>>>>>           timestamp =
>>>>>
>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
>>>>> newValue.timestamp),
>>>>>           key = "reduced"
>>>>>         )
>>>>>       }, "aggregated-table")
>>>>>       .toStream
>>>>>       .to("slope-aggregated-table")
>>>>>
>>>>> I log all calls to adder and subtractor, so I am able to see what's
>>>>> going on there, as well as I track the original keys of the aggregated
>>>>> values and their timestamps, so it's relatively easy to see how the
>>>>> data goes through this topology
>>>>>
>>>>> In order to reproduce this behavior I need to:
>>>>> 1. Start a full topology (with table aggregation)
>>>>> 2. Start without table aggregation (no app reset)
>>>>> 3. Start with table aggregation (no app reset)
>>>>>
>>>>> Bellow is an interpretation of the adder/subtractor logs for a given
>>>>> key/window in the chronological order
>>>>>
>>>>> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
>>>>> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
>>>>> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
>>>>> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
>>>>> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
>>>>> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
>>>>>
>>>>> And in the end the last value that's materialized for that window
>>>>> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
>>>>> for a single key between some point in the middle of the window and at
>>>>> the end of the window, times 3. As opposed to the expected value of
>>>>> 751 * 3 = 2253 (sum of last values in a time window for all keys being
>>>>> aggregated).
>>>>>
>>>>> It's clear to me that I should do an application reset, but I also
>>>>> would like to understand, should I expect adder/subtractor being
>>>>> called with null aggValue, or is it a clear sign that something went
>>>>> horribly wrong?
>>>>>
>>>>> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <jo...@confluent.io>
>>> wrote:
>>>>>>
>>>>>> Hi Vasily,
>>>>>>
>>>>>> Thanks for the email.
>>>>>>
>>>>>> To answer your question: you should reset the application basically
>>> any
>>>>>> time you change the topology. Some transitions are safe, but others
>>> will
>>>>>> result in data loss or corruption. Rather than try to reason about
>>> which
>>>>> is
>>>>>> which, it's much safer just to either reset the app or not change it
>>> (if
>>>>> it
>>>>>> has important state).
>>>>>>
>>>>>> Beyond changes that you make to the topology, we spend a lot of
>>> effort to
>>>>>> try and make sure that different versions of Streams will produce the
>>>>> same
>>>>>> topology, so unless the release notes say otherwise, you should be
>>> able
>>>>> to
>>>>>> upgrade without a reset.
>>>>>>
>>>>>>
>>>>>> I can't say right now whether those wacky behaviors are bugs or the
>>>>> result
>>>>>> of changing the topology without a reset. Or if they are correct but
>>>>>> surprising behavior somehow. I'll look into it tomorrow. Do feel
>>> free to
>>>>>> open a Jira ticket if you think you have found a bug, especially if
>>> you
>>>>> can
>>>>>> describe a repro. Knowing your topology before and after the change
>>> would
>>>>>> also be immensely helpful. You can print it with Topology.describe().
>>>>>>
>>>>>> Regardless, I'll make a note to take a look at the code tomorrow and
>>> try
>>>>> to
>>>>>> decide if you should expect these behaviors with "clean" topology
>>>>> changes.
>>>>>>
>>>>>> Thanks,
>>>>>> -John
>>>>>>
>>>>>> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <
>>> vasily@sulatskov.net>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am doing some experiments with kafka-streams KGroupedTable
>>>>>>> aggregation, and admittedly I am not wiping data properly on each
>>>>>>> restart, partially because I also wonder what would happen if you
>>>>>>> change a streams topology without doing a proper reset.
>>>>>>>
>>>>>>> I've noticed that from time to time, kafka-streams
>>>>>>> KGroupedTable.reduce() can call subtractor function with null
>>>>>>> aggregator value, and if you try to work around that, by
>>> interpreting
>>>>>>> null aggregator value as zero for numeric value you get incorrect
>>>>>>> aggregation result.
>>>>>>>
>>>>>>> I do understand that the proper way of handling this is to do a
>>> reset
>>>>>>> on topology changes, but I'd like to understand if there's any
>>>>>>> legitmate case when kafka-streams can call an adder or a
>>> substractor
>>>>>>> with null aggregator value, and should I plan for this, or should I
>>>>>>> interpret this as an invalid state, and terminate the application,
>>> and
>>>>>>> do a proper reset?
>>>>>>>
>>>>>>> Also, I can't seem to find a guide which explains when application
>>>>>>> reset is necessary. Intuitively it seems that it should be done
>>> every
>>>>>>> time a topology changes. Any other cases?
>>>>>>>
>>>>>>> I tried to debug where the null value comes from and it seems that
>>>>>>> KTableReduce.process() is getting called with Change<V> value with
>>>>>>> newValue == null, and some non-null oldValue. Which leads to and to
>>>>>>> subtractor being called with null aggregator value. I wonder how
>>> it is
>>>>>>> possible to have an old value for a key without a new value (does
>>> it
>>>>>>> happen because of the auto commit interval?).
>>>>>>>
>>>>>>> I've also noticed that it's possible for an input value from a
>>> topic
>>>>>>> to bypass aggregation function entirely and be directly
>>> transmitted to
>>>>>>> the output in certain cases: oldAgg is null, newValue is not null
>>> and
>>>>>>> oldValue is null - in that case newValue will be transmitted
>>> directly
>>>>>>> to the output. I suppose it's the correct behaviour, but feels a
>>> bit
>>>>>>> weird nonetheless. And I've actually been able to observe this
>>>>>>> behaviour in practice. I suppose it's also caused by this happening
>>>>>>> right before a commit happens, and the message is sent to a
>>> changelog
>>>>>>> topic.
>>>>>>>
>>>>>>> Please can someone with more knowledge shed some light on these
>>> issues?
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Vasily Sulatskov
>>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Vasily Sulatskov
>>>>>
>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Vasily Sulatskov
>>>
> 
> 
> 


Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by Vasily Sulatskov <va...@sulatskov.net>.
Hi John,

Thanks a lot for you explanation. It does make much more sense now.

The Jira issue I think is pretty well explained (with a reference to
this thread). And I've lest my 2 cents in the pull request.

You are right I didn't notice that repartition topic contains the same
message effectively twice, and 0/1 bytes are non-visible, so when I
used kafka-console-consumer I didn't notice that. So I have a quick
suggestion here, wouldn't it make sense to change 0 and 1 bytes to
something that has visible corresponding ascii characters, say + and
-, as these messages are effectively commands to reducer to execute
either an addition or subtraction?

On a more serious, side, can you please explain temporal aspects of
how change messages are handled? More specifically, is it guaranteed
that both Change(newValue, null) and Change(null, oldValue) are
handled before a new aggregated value is comitted to an output topic?
Change(newValue, null) and Change(null, oldValue) are delivered as two
separate messages via a kafka topic, and when they are read from a
topic (possibly on a different machine where a commit interval is
asynchronous to a machine that's put these changes into a topic) can
it happen so a Change(newValue, null) is processed by a
KTableReduceProcessor, the value of the aggregator is updated, and
committed to the changelog topic, and a Change(null, oldValue) is
processed only in the next commit interval? If I am understand this
correctly that would mean that in an aggregated table an incorrect
aggregated value will be observed briefly, before being eventually
corrected.

Can that happen? Or I can't see something that would make it impossible?
On Fri, Jul 13, 2018 at 8:05 PM John Roesler <jo...@confluent.io> wrote:
>
> Hi Vasily,
>
> I'm glad you're making me look at this; it's good homework for me!
>
> This is very non-obvious, but here's what happens:
>
> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits
> new/old Change pairs as the value.
>
> Next is the Select (aka GroupBy). In the DSL code, this is the
> KTableRepartitionMap (we call it a repartition when you select a new key,
> since the new keys may belong to different partitions).
> KTableRepartitionMap is a processor that does two things:
> 1. it maps K => K1 (new keys) and V => V1 (new values)
> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
> null)]
> In other words, it turns each Change event into two events: a retraction
> and an update
>
> Next comes the reduce operation. In building the processor node for this
> operation, we create the sink, repartition topic, and source, followed by
> the actual Reduce node. So if you want to look at how the changes get
> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
> You'll see that sink and source a ChangedSerializer and ChangedDeserializer.
>
> By looking into those implementations, I found that they depend on each
> Change containing just one of new OR old. They serialize the underlying
> value using the serde you provide, along with a single byte that signifies
> if the serialized value is the new or old value, which the deserializer
> uses on the receiving end to turn it back into a Change(new, null) or
> Change(null, old) as appropriate. This is why the repartition topic looks
> like it's just the raw data. It basically is, except for the magic byte.
>
> Does that make sense?
>
> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
> leaving any feedback you have?
>
> Thanks,
> -John
>
> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <va...@sulatskov.net>
> wrote:
>
> > Hi John,
> >
> > Thanks for your explanation.
> >
> > I have an answer to the practical question, i.e. a null aggregator
> > value should be interpreted as a fatal application error.
> >
> > On the other hand, looking at the app topology, I see that a message
> > from KSTREAM-REDUCE-0000000002 / "table" goes goes to
> > KTABLE-SELECT-0000000006 which in turn forwards data to
> > KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
> > this point I assume that data goes back to kafka into a *-repartition
> > topic, after that the message is read from kafka by
> > KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
> > and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
> > [aggregated-table]), where the actual aggregation takes place. What I
> > don't get is where this Change value comes from, I mean if it's been
> > produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
> > message goes through kafka where it gets serialized, and looking at
> > kafka "repartition" topic, it contains regular values, not a pair of
> > old/new.
> >
> > As far as I understand, Change is a purely in-memory representation of
> > the state for a particular key, and at no point it's serialized back
> > to kafka, yet somehow this Change values makes it to reducer. I feel
> > like I am missing something here. Could you please clarify this?
> >
> > Can you please point me to a place in kafka-streams sources where a
> > Change of newValue/oldValue is produced, so I could take a look? I
> > found KTableReduce implementation, but can't find who makes these
> > Change value.
> > On Fri, Jul 13, 2018 at 6:17 PM John Roesler <jo...@confluent.io> wrote:
> > >
> > > Hi again Vasily,
> > >
> > > Ok, it looks to me like this behavior is the result of the un-clean
> > > topology change.
> > >
> > > Just in case you're interested, here's what I think happened.
> > >
> > > 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table"
> > )
> > > internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
> > > forwarding both the old and new value that we are able to maintain
> > > aggregates using the subtractor/adder pairs)
> > >
> > > 2. In the full topology, these old/new pairs go through some
> > > transformations, but still in some form eventually make their way down to
> > > the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
> > >
> > > 3. The reduce processor logic looks like this:
> > > final V oldAgg = store.get(key);
> > > V newAgg = oldAgg;
> > >
> > > // first try to add the new value
> > > if (value.newValue != null) {
> > >     if (newAgg == null) {
> > >         newAgg = value.newValue;
> > >     } else {
> > >         newAgg = addReducer.apply(newAgg, value.newValue);
> > >     }
> > > }
> > >
> > > // then try to remove the old value
> > > if (value.oldValue != null) {
> > >     // Here's where the assumption breaks down...
> > >     newAgg = removeReducer.apply(newAgg, value.oldValue);
> > > }
> > >
> > > 4. Here's what I think happened. This processor saw an event like
> > > {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block,
> > and
> > > (since "oldValue != null") would go into the second block. I think that
> > in
> > > the normal case we can rely on the invariant that any value we get as an
> > > "oldValue" is one that we've previously seen ( as "newValue" ).
> > > Consequently, we should be able to assume that if we get a non-null
> > > "oldValue", "newAgg" will also not be null (because we would have written
> > > it to the store back when we saw it as "newValue" and then retrieved it
> > > just now as "newAgg = oldAgg").
> > >
> > > However, if subtopology2, along with KTABLE-SELECT-0000000006
> > > and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
> > > "table") has already emitted some values, then we might in fact receive
> > an
> > > event with some "oldValue" that we have in fact never seen before
> > (because (
> > > KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when
> > it
> > > was first emitted as a "newValue").
> > >
> > > This would violate our assumption, and we would unintentionally send a
> > > "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
> > > If you want to double-check my reasoning, you should be able to do so in
> > > the debugger with a breakpoint in KTableReduce.
> > >
> > >
> > > tl;dr: Supposing you reset the app when the topology changes, I think
> > that
> > > you should be able to rely on non-null aggregates being passed in to
> > *both*
> > > the adder and subtractor in a reduce.
> > >
> > > I would be in favor, as you suggested, of adding an explicit check and
> > > throwing an exception if the aggregate is ever null at those points. This
> > > would actually help us detect if the topology has changed unexpectedly
> > and
> > > shut down, hopefully before any damage is done. I'll send a PR and see
> > what
> > > everyone thinks.
> > >
> > > Does this all seem like it adds up to you?
> > > -John
> > >
> > >
> > > On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <va...@sulatskov.net>
> > > wrote:
> > >
> > > > Hi John,
> > > >
> > > > Thanks for your reply. I am not sure if this behavior I've observed is
> > > > a bug or not, as I've not been resetting my application properly. On
> > > > the other hand if the subtractor or adder in the reduce operation are
> > > > never supposed to be called with null aggregator value, perhaps it
> > > > would make sense to put a null check in the table reduce
> > > > implementation to detect an application entering an invalid state. A
> > > > bit like a check for topics having the same number of partitions when
> > > > doing a join.
> > > >
> > > > Here's some information about my tests. Hope that can be useful:
> > > >
> > > > Topology at start:
> > > >
> > > > 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
> > > >    Sub-topology: 0
> > > >     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> > > >       --> KSTREAM-MAP-0000000001
> > > >     Processor: KSTREAM-MAP-0000000001 (stores: [])
> > > >       --> KSTREAM-FILTER-0000000004
> > > >       <-- KSTREAM-SOURCE-0000000000
> > > >     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> > > >       --> KSTREAM-SINK-0000000003
> > > >       <-- KSTREAM-MAP-0000000001
> > > >     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> > > >       <-- KSTREAM-FILTER-0000000004
> > > >
> > > >   Sub-topology: 1
> > > >     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> > > >       --> KSTREAM-REDUCE-0000000002
> > > >     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> > > >       --> KTABLE-TOSTREAM-0000000006
> > > >       <-- KSTREAM-SOURCE-0000000005
> > > >     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
> > > >       --> KSTREAM-SINK-0000000007
> > > >       <-- KSTREAM-REDUCE-0000000002
> > > >     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
> > > >       <-- KTABLE-TOSTREAM-0000000006
> > > >
> > > > This topology just takes data from the source topic "slope" which
> > > > produces messages like this:
> > > >
> > > > key1
> > > > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > > > key3
> > > > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > > > key2
> > > > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > > > key3
> > > > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> > > > key1
> > > > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> > > > key2
> > > > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> > > >
> > > > Every second, there are 3 new messages arrive from "slope" topic for
> > > > keys key1, key2 and key3, with constantly increasing value.
> > > > Data is transformed so that the original key is also tracked in the
> > > > message value, grouped by key, and windowed with a custom window, and
> > > > reduced with a dummy reduce operation to make a KTable.
> > > > KTable is converted back to a stream, and sent to a topic (just for
> > > > debugging purposes).
> > > >
> > > > Here's the source (it's kafka-streams-scala for the most part). Also
> > > > please ignore silly classes, it's obviously a test:
> > > >
> > > >     val slopeTable = builder
> > > >       .stream[String, TimedValue]("slope")
> > > >       .map(
> > > >         (key, value) =>
> > > >           (
> > > >             StringWrapper(key),
> > > >             TimedValueWithKey(value = value.value, timestamp =
> > > > value.timestamp, key = key)
> > > >         )
> > > >       )
> > > >       .groupByKey
> > > >       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
> > > >       .reduceMat((aggValue, newValue) => newValue, "table")
> > > >
> > > >     slopeTable.toStream
> > > >       .to("slope-table")
> > > >
> > > > Topology after change without a proper reset:
> > > >
> > > > 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
> > > >    Sub-topology: 0
> > > >     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> > > >       --> KSTREAM-MAP-0000000001
> > > >     Processor: KSTREAM-MAP-0000000001 (stores: [])
> > > >       --> KSTREAM-FILTER-0000000004
> > > >       <-- KSTREAM-SOURCE-0000000000
> > > >     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> > > >       --> KSTREAM-SINK-0000000003
> > > >       <-- KSTREAM-MAP-0000000001
> > > >     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> > > >       <-- KSTREAM-FILTER-0000000004
> > > >
> > > >   Sub-topology: 1
> > > >     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> > > >       --> KSTREAM-REDUCE-0000000002
> > > >     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> > > >       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
> > > >       <-- KSTREAM-SOURCE-0000000005
> > > >     Processor: KTABLE-SELECT-0000000006 (stores: [])
> > > >       --> KSTREAM-SINK-0000000007
> > > >       <-- KSTREAM-REDUCE-0000000002
> > > >     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
> > > >       --> KSTREAM-SINK-0000000013
> > > >       <-- KSTREAM-REDUCE-0000000002
> > > >     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
> > > >       <-- KTABLE-SELECT-0000000006
> > > >     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
> > > >       <-- KTABLE-TOSTREAM-0000000012
> > > >
> > > >   Sub-topology: 2
> > > >     Source: KSTREAM-SOURCE-0000000008 (topics:
> > > > [aggregated-table-repartition])
> > > >       --> KTABLE-REDUCE-0000000009
> > > >     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
> > > >       --> KTABLE-TOSTREAM-0000000010
> > > >       <-- KSTREAM-SOURCE-0000000008
> > > >     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
> > > >       --> KSTREAM-SINK-0000000011
> > > >       <-- KTABLE-REDUCE-0000000009
> > > >     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
> > > >       <-- KTABLE-TOSTREAM-0000000010
> > > >
> > > > Here's the source of the sub-topology that does table aggregation:
> > > >
> > > >     slopeTable
> > > >       .groupBy(
> > > >         (key, value) => (new Windowed(StringWrapper("dummykey"),
> > > > key.window()), value)
> > > >       )
> > > >       .reduceMat(adder = (aggValue, newValue) => {
> > > >         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
> > > >         val agg = Option(aggValue)
> > > >         TimedValueWithKey(
> > > >           value = agg.map(_.value).getOrElse(0) + newValue.value,
> > > >           timestamp =
> > > >
> > > > Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> > > > newValue.timestamp),
> > > >           key = "reduced"
> > > >         )
> > > >       }, subtractor = (aggValue, newValue) => {
> > > >         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
> > > >         val agg = Option(aggValue)
> > > >         TimedValueWithKey(
> > > >           value = agg.map(_.value).getOrElse(0) - newValue.value,
> > > >           timestamp =
> > > >
> > > > Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> > > > newValue.timestamp),
> > > >           key = "reduced"
> > > >         )
> > > >       }, "aggregated-table")
> > > >       .toStream
> > > >       .to("slope-aggregated-table")
> > > >
> > > > I log all calls to adder and subtractor, so I am able to see what's
> > > > going on there, as well as I track the original keys of the aggregated
> > > > values and their timestamps, so it's relatively easy to see how the
> > > > data goes through this topology
> > > >
> > > > In order to reproduce this behavior I need to:
> > > > 1. Start a full topology (with table aggregation)
> > > > 2. Start without table aggregation (no app reset)
> > > > 3. Start with table aggregation (no app reset)
> > > >
> > > > Bellow is an interpretation of the adder/subtractor logs for a given
> > > > key/window in the chronological order
> > > >
> > > > SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> > > > ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> > > > SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
> > > > ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
> > > > SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
> > > > ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
> > > >
> > > > And in the end the last value that's materialized for that window
> > > > (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
> > > > for a single key between some point in the middle of the window and at
> > > > the end of the window, times 3. As opposed to the expected value of
> > > > 751 * 3 = 2253 (sum of last values in a time window for all keys being
> > > > aggregated).
> > > >
> > > > It's clear to me that I should do an application reset, but I also
> > > > would like to understand, should I expect adder/subtractor being
> > > > called with null aggValue, or is it a clear sign that something went
> > > > horribly wrong?
> > > >
> > > > On Fri, Jul 13, 2018 at 12:19 AM John Roesler <jo...@confluent.io>
> > wrote:
> > > > >
> > > > > Hi Vasily,
> > > > >
> > > > > Thanks for the email.
> > > > >
> > > > > To answer your question: you should reset the application basically
> > any
> > > > > time you change the topology. Some transitions are safe, but others
> > will
> > > > > result in data loss or corruption. Rather than try to reason about
> > which
> > > > is
> > > > > which, it's much safer just to either reset the app or not change it
> > (if
> > > > it
> > > > > has important state).
> > > > >
> > > > > Beyond changes that you make to the topology, we spend a lot of
> > effort to
> > > > > try and make sure that different versions of Streams will produce the
> > > > same
> > > > > topology, so unless the release notes say otherwise, you should be
> > able
> > > > to
> > > > > upgrade without a reset.
> > > > >
> > > > >
> > > > > I can't say right now whether those wacky behaviors are bugs or the
> > > > result
> > > > > of changing the topology without a reset. Or if they are correct but
> > > > > surprising behavior somehow. I'll look into it tomorrow. Do feel
> > free to
> > > > > open a Jira ticket if you think you have found a bug, especially if
> > you
> > > > can
> > > > > describe a repro. Knowing your topology before and after the change
> > would
> > > > > also be immensely helpful. You can print it with Topology.describe().
> > > > >
> > > > > Regardless, I'll make a note to take a look at the code tomorrow and
> > try
> > > > to
> > > > > decide if you should expect these behaviors with "clean" topology
> > > > changes.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <
> > vasily@sulatskov.net>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am doing some experiments with kafka-streams KGroupedTable
> > > > > > aggregation, and admittedly I am not wiping data properly on each
> > > > > > restart, partially because I also wonder what would happen if you
> > > > > > change a streams topology without doing a proper reset.
> > > > > >
> > > > > > I've noticed that from time to time, kafka-streams
> > > > > > KGroupedTable.reduce() can call subtractor function with null
> > > > > > aggregator value, and if you try to work around that, by
> > interpreting
> > > > > > null aggregator value as zero for numeric value you get incorrect
> > > > > > aggregation result.
> > > > > >
> > > > > > I do understand that the proper way of handling this is to do a
> > reset
> > > > > > on topology changes, but I'd like to understand if there's any
> > > > > > legitmate case when kafka-streams can call an adder or a
> > substractor
> > > > > > with null aggregator value, and should I plan for this, or should I
> > > > > > interpret this as an invalid state, and terminate the application,
> > and
> > > > > > do a proper reset?
> > > > > >
> > > > > > Also, I can't seem to find a guide which explains when application
> > > > > > reset is necessary. Intuitively it seems that it should be done
> > every
> > > > > > time a topology changes. Any other cases?
> > > > > >
> > > > > > I tried to debug where the null value comes from and it seems that
> > > > > > KTableReduce.process() is getting called with Change<V> value with
> > > > > > newValue == null, and some non-null oldValue. Which leads to and to
> > > > > > subtractor being called with null aggregator value. I wonder how
> > it is
> > > > > > possible to have an old value for a key without a new value (does
> > it
> > > > > > happen because of the auto commit interval?).
> > > > > >
> > > > > > I've also noticed that it's possible for an input value from a
> > topic
> > > > > > to bypass aggregation function entirely and be directly
> > transmitted to
> > > > > > the output in certain cases: oldAgg is null, newValue is not null
> > and
> > > > > > oldValue is null - in that case newValue will be transmitted
> > directly
> > > > > > to the output. I suppose it's the correct behaviour, but feels a
> > bit
> > > > > > weird nonetheless. And I've actually been able to observe this
> > > > > > behaviour in practice. I suppose it's also caused by this happening
> > > > > > right before a commit happens, and the message is sent to a
> > changelog
> > > > > > topic.
> > > > > >
> > > > > > Please can someone with more knowledge shed some light on these
> > issues?
> > > > > >
> > > > > > --
> > > > > > Best regards,
> > > > > > Vasily Sulatskov
> > > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Best regards,
> > > > Vasily Sulatskov
> > > >
> >
> >
> >
> > --
> > Best regards,
> > Vasily Sulatskov
> >



-- 
Best regards,
Vasily Sulatskov

Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by John Roesler <jo...@confluent.io>.
Hi Vasily,

I'm glad you're making me look at this; it's good homework for me!

This is very non-obvious, but here's what happens:

KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits
new/old Change pairs as the value.

Next is the Select (aka GroupBy). In the DSL code, this is the
KTableRepartitionMap (we call it a repartition when you select a new key,
since the new keys may belong to different partitions).
KTableRepartitionMap is a processor that does two things:
1. it maps K => K1 (new keys) and V => V1 (new values)
2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
null)]
In other words, it turns each Change event into two events: a retraction
and an update

Next comes the reduce operation. In building the processor node for this
operation, we create the sink, repartition topic, and source, followed by
the actual Reduce node. So if you want to look at how the changes get
serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
You'll see that sink and source a ChangedSerializer and ChangedDeserializer.

By looking into those implementations, I found that they depend on each
Change containing just one of new OR old. They serialize the underlying
value using the serde you provide, along with a single byte that signifies
if the serialized value is the new or old value, which the deserializer
uses on the receiving end to turn it back into a Change(new, null) or
Change(null, old) as appropriate. This is why the repartition topic looks
like it's just the raw data. It basically is, except for the magic byte.

Does that make sense?

Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
leaving any feedback you have?

Thanks,
-John

On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <va...@sulatskov.net>
wrote:

> Hi John,
>
> Thanks for your explanation.
>
> I have an answer to the practical question, i.e. a null aggregator
> value should be interpreted as a fatal application error.
>
> On the other hand, looking at the app topology, I see that a message
> from KSTREAM-REDUCE-0000000002 / "table" goes goes to
> KTABLE-SELECT-0000000006 which in turn forwards data to
> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
> this point I assume that data goes back to kafka into a *-repartition
> topic, after that the message is read from kafka by
> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
> [aggregated-table]), where the actual aggregation takes place. What I
> don't get is where this Change value comes from, I mean if it's been
> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
> message goes through kafka where it gets serialized, and looking at
> kafka "repartition" topic, it contains regular values, not a pair of
> old/new.
>
> As far as I understand, Change is a purely in-memory representation of
> the state for a particular key, and at no point it's serialized back
> to kafka, yet somehow this Change values makes it to reducer. I feel
> like I am missing something here. Could you please clarify this?
>
> Can you please point me to a place in kafka-streams sources where a
> Change of newValue/oldValue is produced, so I could take a look? I
> found KTableReduce implementation, but can't find who makes these
> Change value.
> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <jo...@confluent.io> wrote:
> >
> > Hi again Vasily,
> >
> > Ok, it looks to me like this behavior is the result of the un-clean
> > topology change.
> >
> > Just in case you're interested, here's what I think happened.
> >
> > 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table"
> )
> > internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
> > forwarding both the old and new value that we are able to maintain
> > aggregates using the subtractor/adder pairs)
> >
> > 2. In the full topology, these old/new pairs go through some
> > transformations, but still in some form eventually make their way down to
> > the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
> >
> > 3. The reduce processor logic looks like this:
> > final V oldAgg = store.get(key);
> > V newAgg = oldAgg;
> >
> > // first try to add the new value
> > if (value.newValue != null) {
> >     if (newAgg == null) {
> >         newAgg = value.newValue;
> >     } else {
> >         newAgg = addReducer.apply(newAgg, value.newValue);
> >     }
> > }
> >
> > // then try to remove the old value
> > if (value.oldValue != null) {
> >     // Here's where the assumption breaks down...
> >     newAgg = removeReducer.apply(newAgg, value.oldValue);
> > }
> >
> > 4. Here's what I think happened. This processor saw an event like
> > {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block,
> and
> > (since "oldValue != null") would go into the second block. I think that
> in
> > the normal case we can rely on the invariant that any value we get as an
> > "oldValue" is one that we've previously seen ( as "newValue" ).
> > Consequently, we should be able to assume that if we get a non-null
> > "oldValue", "newAgg" will also not be null (because we would have written
> > it to the store back when we saw it as "newValue" and then retrieved it
> > just now as "newAgg = oldAgg").
> >
> > However, if subtopology2, along with KTABLE-SELECT-0000000006
> > and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
> > "table") has already emitted some values, then we might in fact receive
> an
> > event with some "oldValue" that we have in fact never seen before
> (because (
> > KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when
> it
> > was first emitted as a "newValue").
> >
> > This would violate our assumption, and we would unintentionally send a
> > "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
> > If you want to double-check my reasoning, you should be able to do so in
> > the debugger with a breakpoint in KTableReduce.
> >
> >
> > tl;dr: Supposing you reset the app when the topology changes, I think
> that
> > you should be able to rely on non-null aggregates being passed in to
> *both*
> > the adder and subtractor in a reduce.
> >
> > I would be in favor, as you suggested, of adding an explicit check and
> > throwing an exception if the aggregate is ever null at those points. This
> > would actually help us detect if the topology has changed unexpectedly
> and
> > shut down, hopefully before any damage is done. I'll send a PR and see
> what
> > everyone thinks.
> >
> > Does this all seem like it adds up to you?
> > -John
> >
> >
> > On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <va...@sulatskov.net>
> > wrote:
> >
> > > Hi John,
> > >
> > > Thanks for your reply. I am not sure if this behavior I've observed is
> > > a bug or not, as I've not been resetting my application properly. On
> > > the other hand if the subtractor or adder in the reduce operation are
> > > never supposed to be called with null aggregator value, perhaps it
> > > would make sense to put a null check in the table reduce
> > > implementation to detect an application entering an invalid state. A
> > > bit like a check for topics having the same number of partitions when
> > > doing a join.
> > >
> > > Here's some information about my tests. Hope that can be useful:
> > >
> > > Topology at start:
> > >
> > > 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
> > >    Sub-topology: 0
> > >     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> > >       --> KSTREAM-MAP-0000000001
> > >     Processor: KSTREAM-MAP-0000000001 (stores: [])
> > >       --> KSTREAM-FILTER-0000000004
> > >       <-- KSTREAM-SOURCE-0000000000
> > >     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> > >       --> KSTREAM-SINK-0000000003
> > >       <-- KSTREAM-MAP-0000000001
> > >     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> > >       <-- KSTREAM-FILTER-0000000004
> > >
> > >   Sub-topology: 1
> > >     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> > >       --> KSTREAM-REDUCE-0000000002
> > >     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> > >       --> KTABLE-TOSTREAM-0000000006
> > >       <-- KSTREAM-SOURCE-0000000005
> > >     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
> > >       --> KSTREAM-SINK-0000000007
> > >       <-- KSTREAM-REDUCE-0000000002
> > >     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
> > >       <-- KTABLE-TOSTREAM-0000000006
> > >
> > > This topology just takes data from the source topic "slope" which
> > > produces messages like this:
> > >
> > > key1
> > > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > > key3
> > > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > > key2
> > > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > > key3
> > > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> > > key1
> > > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> > > key2
> > > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> > >
> > > Every second, there are 3 new messages arrive from "slope" topic for
> > > keys key1, key2 and key3, with constantly increasing value.
> > > Data is transformed so that the original key is also tracked in the
> > > message value, grouped by key, and windowed with a custom window, and
> > > reduced with a dummy reduce operation to make a KTable.
> > > KTable is converted back to a stream, and sent to a topic (just for
> > > debugging purposes).
> > >
> > > Here's the source (it's kafka-streams-scala for the most part). Also
> > > please ignore silly classes, it's obviously a test:
> > >
> > >     val slopeTable = builder
> > >       .stream[String, TimedValue]("slope")
> > >       .map(
> > >         (key, value) =>
> > >           (
> > >             StringWrapper(key),
> > >             TimedValueWithKey(value = value.value, timestamp =
> > > value.timestamp, key = key)
> > >         )
> > >       )
> > >       .groupByKey
> > >       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
> > >       .reduceMat((aggValue, newValue) => newValue, "table")
> > >
> > >     slopeTable.toStream
> > >       .to("slope-table")
> > >
> > > Topology after change without a proper reset:
> > >
> > > 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
> > >    Sub-topology: 0
> > >     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> > >       --> KSTREAM-MAP-0000000001
> > >     Processor: KSTREAM-MAP-0000000001 (stores: [])
> > >       --> KSTREAM-FILTER-0000000004
> > >       <-- KSTREAM-SOURCE-0000000000
> > >     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> > >       --> KSTREAM-SINK-0000000003
> > >       <-- KSTREAM-MAP-0000000001
> > >     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> > >       <-- KSTREAM-FILTER-0000000004
> > >
> > >   Sub-topology: 1
> > >     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> > >       --> KSTREAM-REDUCE-0000000002
> > >     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> > >       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
> > >       <-- KSTREAM-SOURCE-0000000005
> > >     Processor: KTABLE-SELECT-0000000006 (stores: [])
> > >       --> KSTREAM-SINK-0000000007
> > >       <-- KSTREAM-REDUCE-0000000002
> > >     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
> > >       --> KSTREAM-SINK-0000000013
> > >       <-- KSTREAM-REDUCE-0000000002
> > >     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
> > >       <-- KTABLE-SELECT-0000000006
> > >     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
> > >       <-- KTABLE-TOSTREAM-0000000012
> > >
> > >   Sub-topology: 2
> > >     Source: KSTREAM-SOURCE-0000000008 (topics:
> > > [aggregated-table-repartition])
> > >       --> KTABLE-REDUCE-0000000009
> > >     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
> > >       --> KTABLE-TOSTREAM-0000000010
> > >       <-- KSTREAM-SOURCE-0000000008
> > >     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
> > >       --> KSTREAM-SINK-0000000011
> > >       <-- KTABLE-REDUCE-0000000009
> > >     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
> > >       <-- KTABLE-TOSTREAM-0000000010
> > >
> > > Here's the source of the sub-topology that does table aggregation:
> > >
> > >     slopeTable
> > >       .groupBy(
> > >         (key, value) => (new Windowed(StringWrapper("dummykey"),
> > > key.window()), value)
> > >       )
> > >       .reduceMat(adder = (aggValue, newValue) => {
> > >         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
> > >         val agg = Option(aggValue)
> > >         TimedValueWithKey(
> > >           value = agg.map(_.value).getOrElse(0) + newValue.value,
> > >           timestamp =
> > >
> > > Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> > > newValue.timestamp),
> > >           key = "reduced"
> > >         )
> > >       }, subtractor = (aggValue, newValue) => {
> > >         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
> > >         val agg = Option(aggValue)
> > >         TimedValueWithKey(
> > >           value = agg.map(_.value).getOrElse(0) - newValue.value,
> > >           timestamp =
> > >
> > > Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> > > newValue.timestamp),
> > >           key = "reduced"
> > >         )
> > >       }, "aggregated-table")
> > >       .toStream
> > >       .to("slope-aggregated-table")
> > >
> > > I log all calls to adder and subtractor, so I am able to see what's
> > > going on there, as well as I track the original keys of the aggregated
> > > values and their timestamps, so it's relatively easy to see how the
> > > data goes through this topology
> > >
> > > In order to reproduce this behavior I need to:
> > > 1. Start a full topology (with table aggregation)
> > > 2. Start without table aggregation (no app reset)
> > > 3. Start with table aggregation (no app reset)
> > >
> > > Bellow is an interpretation of the adder/subtractor logs for a given
> > > key/window in the chronological order
> > >
> > > SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> > > ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> > > SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
> > > ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
> > > SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
> > > ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
> > >
> > > And in the end the last value that's materialized for that window
> > > (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
> > > for a single key between some point in the middle of the window and at
> > > the end of the window, times 3. As opposed to the expected value of
> > > 751 * 3 = 2253 (sum of last values in a time window for all keys being
> > > aggregated).
> > >
> > > It's clear to me that I should do an application reset, but I also
> > > would like to understand, should I expect adder/subtractor being
> > > called with null aggValue, or is it a clear sign that something went
> > > horribly wrong?
> > >
> > > On Fri, Jul 13, 2018 at 12:19 AM John Roesler <jo...@confluent.io>
> wrote:
> > > >
> > > > Hi Vasily,
> > > >
> > > > Thanks for the email.
> > > >
> > > > To answer your question: you should reset the application basically
> any
> > > > time you change the topology. Some transitions are safe, but others
> will
> > > > result in data loss or corruption. Rather than try to reason about
> which
> > > is
> > > > which, it's much safer just to either reset the app or not change it
> (if
> > > it
> > > > has important state).
> > > >
> > > > Beyond changes that you make to the topology, we spend a lot of
> effort to
> > > > try and make sure that different versions of Streams will produce the
> > > same
> > > > topology, so unless the release notes say otherwise, you should be
> able
> > > to
> > > > upgrade without a reset.
> > > >
> > > >
> > > > I can't say right now whether those wacky behaviors are bugs or the
> > > result
> > > > of changing the topology without a reset. Or if they are correct but
> > > > surprising behavior somehow. I'll look into it tomorrow. Do feel
> free to
> > > > open a Jira ticket if you think you have found a bug, especially if
> you
> > > can
> > > > describe a repro. Knowing your topology before and after the change
> would
> > > > also be immensely helpful. You can print it with Topology.describe().
> > > >
> > > > Regardless, I'll make a note to take a look at the code tomorrow and
> try
> > > to
> > > > decide if you should expect these behaviors with "clean" topology
> > > changes.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <
> vasily@sulatskov.net>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I am doing some experiments with kafka-streams KGroupedTable
> > > > > aggregation, and admittedly I am not wiping data properly on each
> > > > > restart, partially because I also wonder what would happen if you
> > > > > change a streams topology without doing a proper reset.
> > > > >
> > > > > I've noticed that from time to time, kafka-streams
> > > > > KGroupedTable.reduce() can call subtractor function with null
> > > > > aggregator value, and if you try to work around that, by
> interpreting
> > > > > null aggregator value as zero for numeric value you get incorrect
> > > > > aggregation result.
> > > > >
> > > > > I do understand that the proper way of handling this is to do a
> reset
> > > > > on topology changes, but I'd like to understand if there's any
> > > > > legitmate case when kafka-streams can call an adder or a
> substractor
> > > > > with null aggregator value, and should I plan for this, or should I
> > > > > interpret this as an invalid state, and terminate the application,
> and
> > > > > do a proper reset?
> > > > >
> > > > > Also, I can't seem to find a guide which explains when application
> > > > > reset is necessary. Intuitively it seems that it should be done
> every
> > > > > time a topology changes. Any other cases?
> > > > >
> > > > > I tried to debug where the null value comes from and it seems that
> > > > > KTableReduce.process() is getting called with Change<V> value with
> > > > > newValue == null, and some non-null oldValue. Which leads to and to
> > > > > subtractor being called with null aggregator value. I wonder how
> it is
> > > > > possible to have an old value for a key without a new value (does
> it
> > > > > happen because of the auto commit interval?).
> > > > >
> > > > > I've also noticed that it's possible for an input value from a
> topic
> > > > > to bypass aggregation function entirely and be directly
> transmitted to
> > > > > the output in certain cases: oldAgg is null, newValue is not null
> and
> > > > > oldValue is null - in that case newValue will be transmitted
> directly
> > > > > to the output. I suppose it's the correct behaviour, but feels a
> bit
> > > > > weird nonetheless. And I've actually been able to observe this
> > > > > behaviour in practice. I suppose it's also caused by this happening
> > > > > right before a commit happens, and the message is sent to a
> changelog
> > > > > topic.
> > > > >
> > > > > Please can someone with more knowledge shed some light on these
> issues?
> > > > >
> > > > > --
> > > > > Best regards,
> > > > > Vasily Sulatskov
> > > > >
> > >
> > >
> > >
> > > --
> > > Best regards,
> > > Vasily Sulatskov
> > >
>
>
>
> --
> Best regards,
> Vasily Sulatskov
>

Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by Vasily Sulatskov <va...@sulatskov.net>.
Hi John,

Thanks for your explanation.

I have an answer to the practical question, i.e. a null aggregator
value should be interpreted as a fatal application error.

On the other hand, looking at the app topology, I see that a message
from KSTREAM-REDUCE-0000000002 / "table" goes goes to
KTABLE-SELECT-0000000006 which in turn forwards data to
KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
this point I assume that data goes back to kafka into a *-repartition
topic, after that the message is read from kafka by
KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
[aggregated-table]), where the actual aggregation takes place. What I
don't get is where this Change value comes from, I mean if it's been
produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
message goes through kafka where it gets serialized, and looking at
kafka "repartition" topic, it contains regular values, not a pair of
old/new.

As far as I understand, Change is a purely in-memory representation of
the state for a particular key, and at no point it's serialized back
to kafka, yet somehow this Change values makes it to reducer. I feel
like I am missing something here. Could you please clarify this?

Can you please point me to a place in kafka-streams sources where a
Change of newValue/oldValue is produced, so I could take a look? I
found KTableReduce implementation, but can't find who makes these
Change value.
On Fri, Jul 13, 2018 at 6:17 PM John Roesler <jo...@confluent.io> wrote:
>
> Hi again Vasily,
>
> Ok, it looks to me like this behavior is the result of the un-clean
> topology change.
>
> Just in case you're interested, here's what I think happened.
>
> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table" )
> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
> forwarding both the old and new value that we are able to maintain
> aggregates using the subtractor/adder pairs)
>
> 2. In the full topology, these old/new pairs go through some
> transformations, but still in some form eventually make their way down to
> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
>
> 3. The reduce processor logic looks like this:
> final V oldAgg = store.get(key);
> V newAgg = oldAgg;
>
> // first try to add the new value
> if (value.newValue != null) {
>     if (newAgg == null) {
>         newAgg = value.newValue;
>     } else {
>         newAgg = addReducer.apply(newAgg, value.newValue);
>     }
> }
>
> // then try to remove the old value
> if (value.oldValue != null) {
>     // Here's where the assumption breaks down...
>     newAgg = removeReducer.apply(newAgg, value.oldValue);
> }
>
> 4. Here's what I think happened. This processor saw an event like
> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block, and
> (since "oldValue != null") would go into the second block. I think that in
> the normal case we can rely on the invariant that any value we get as an
> "oldValue" is one that we've previously seen ( as "newValue" ).
> Consequently, we should be able to assume that if we get a non-null
> "oldValue", "newAgg" will also not be null (because we would have written
> it to the store back when we saw it as "newValue" and then retrieved it
> just now as "newAgg = oldAgg").
>
> However, if subtopology2, along with KTABLE-SELECT-0000000006
> and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
> "table") has already emitted some values, then we might in fact receive an
> event with some "oldValue" that we have in fact never seen before (because (
> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when it
> was first emitted as a "newValue").
>
> This would violate our assumption, and we would unintentionally send a
> "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
> If you want to double-check my reasoning, you should be able to do so in
> the debugger with a breakpoint in KTableReduce.
>
>
> tl;dr: Supposing you reset the app when the topology changes, I think that
> you should be able to rely on non-null aggregates being passed in to *both*
> the adder and subtractor in a reduce.
>
> I would be in favor, as you suggested, of adding an explicit check and
> throwing an exception if the aggregate is ever null at those points. This
> would actually help us detect if the topology has changed unexpectedly and
> shut down, hopefully before any damage is done. I'll send a PR and see what
> everyone thinks.
>
> Does this all seem like it adds up to you?
> -John
>
>
> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <va...@sulatskov.net>
> wrote:
>
> > Hi John,
> >
> > Thanks for your reply. I am not sure if this behavior I've observed is
> > a bug or not, as I've not been resetting my application properly. On
> > the other hand if the subtractor or adder in the reduce operation are
> > never supposed to be called with null aggregator value, perhaps it
> > would make sense to put a null check in the table reduce
> > implementation to detect an application entering an invalid state. A
> > bit like a check for topics having the same number of partitions when
> > doing a join.
> >
> > Here's some information about my tests. Hope that can be useful:
> >
> > Topology at start:
> >
> > 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
> >    Sub-topology: 0
> >     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >       --> KSTREAM-MAP-0000000001
> >     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >       --> KSTREAM-FILTER-0000000004
> >       <-- KSTREAM-SOURCE-0000000000
> >     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >       --> KSTREAM-SINK-0000000003
> >       <-- KSTREAM-MAP-0000000001
> >     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >       <-- KSTREAM-FILTER-0000000004
> >
> >   Sub-topology: 1
> >     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >       --> KSTREAM-REDUCE-0000000002
> >     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >       --> KTABLE-TOSTREAM-0000000006
> >       <-- KSTREAM-SOURCE-0000000005
> >     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
> >       --> KSTREAM-SINK-0000000007
> >       <-- KSTREAM-REDUCE-0000000002
> >     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
> >       <-- KTABLE-TOSTREAM-0000000006
> >
> > This topology just takes data from the source topic "slope" which
> > produces messages like this:
> >
> > key1
> > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > key3
> > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > key2
> > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > key3
> > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> > key1
> > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> > key2
> > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >
> > Every second, there are 3 new messages arrive from "slope" topic for
> > keys key1, key2 and key3, with constantly increasing value.
> > Data is transformed so that the original key is also tracked in the
> > message value, grouped by key, and windowed with a custom window, and
> > reduced with a dummy reduce operation to make a KTable.
> > KTable is converted back to a stream, and sent to a topic (just for
> > debugging purposes).
> >
> > Here's the source (it's kafka-streams-scala for the most part). Also
> > please ignore silly classes, it's obviously a test:
> >
> >     val slopeTable = builder
> >       .stream[String, TimedValue]("slope")
> >       .map(
> >         (key, value) =>
> >           (
> >             StringWrapper(key),
> >             TimedValueWithKey(value = value.value, timestamp =
> > value.timestamp, key = key)
> >         )
> >       )
> >       .groupByKey
> >       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
> >       .reduceMat((aggValue, newValue) => newValue, "table")
> >
> >     slopeTable.toStream
> >       .to("slope-table")
> >
> > Topology after change without a proper reset:
> >
> > 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
> >    Sub-topology: 0
> >     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >       --> KSTREAM-MAP-0000000001
> >     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >       --> KSTREAM-FILTER-0000000004
> >       <-- KSTREAM-SOURCE-0000000000
> >     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >       --> KSTREAM-SINK-0000000003
> >       <-- KSTREAM-MAP-0000000001
> >     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >       <-- KSTREAM-FILTER-0000000004
> >
> >   Sub-topology: 1
> >     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >       --> KSTREAM-REDUCE-0000000002
> >     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
> >       <-- KSTREAM-SOURCE-0000000005
> >     Processor: KTABLE-SELECT-0000000006 (stores: [])
> >       --> KSTREAM-SINK-0000000007
> >       <-- KSTREAM-REDUCE-0000000002
> >     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
> >       --> KSTREAM-SINK-0000000013
> >       <-- KSTREAM-REDUCE-0000000002
> >     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
> >       <-- KTABLE-SELECT-0000000006
> >     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
> >       <-- KTABLE-TOSTREAM-0000000012
> >
> >   Sub-topology: 2
> >     Source: KSTREAM-SOURCE-0000000008 (topics:
> > [aggregated-table-repartition])
> >       --> KTABLE-REDUCE-0000000009
> >     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
> >       --> KTABLE-TOSTREAM-0000000010
> >       <-- KSTREAM-SOURCE-0000000008
> >     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
> >       --> KSTREAM-SINK-0000000011
> >       <-- KTABLE-REDUCE-0000000009
> >     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
> >       <-- KTABLE-TOSTREAM-0000000010
> >
> > Here's the source of the sub-topology that does table aggregation:
> >
> >     slopeTable
> >       .groupBy(
> >         (key, value) => (new Windowed(StringWrapper("dummykey"),
> > key.window()), value)
> >       )
> >       .reduceMat(adder = (aggValue, newValue) => {
> >         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
> >         val agg = Option(aggValue)
> >         TimedValueWithKey(
> >           value = agg.map(_.value).getOrElse(0) + newValue.value,
> >           timestamp =
> >
> > Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> > newValue.timestamp),
> >           key = "reduced"
> >         )
> >       }, subtractor = (aggValue, newValue) => {
> >         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
> >         val agg = Option(aggValue)
> >         TimedValueWithKey(
> >           value = agg.map(_.value).getOrElse(0) - newValue.value,
> >           timestamp =
> >
> > Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> > newValue.timestamp),
> >           key = "reduced"
> >         )
> >       }, "aggregated-table")
> >       .toStream
> >       .to("slope-aggregated-table")
> >
> > I log all calls to adder and subtractor, so I am able to see what's
> > going on there, as well as I track the original keys of the aggregated
> > values and their timestamps, so it's relatively easy to see how the
> > data goes through this topology
> >
> > In order to reproduce this behavior I need to:
> > 1. Start a full topology (with table aggregation)
> > 2. Start without table aggregation (no app reset)
> > 3. Start with table aggregation (no app reset)
> >
> > Bellow is an interpretation of the adder/subtractor logs for a given
> > key/window in the chronological order
> >
> > SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> > ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> > SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
> > ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
> > SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
> > ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
> >
> > And in the end the last value that's materialized for that window
> > (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
> > for a single key between some point in the middle of the window and at
> > the end of the window, times 3. As opposed to the expected value of
> > 751 * 3 = 2253 (sum of last values in a time window for all keys being
> > aggregated).
> >
> > It's clear to me that I should do an application reset, but I also
> > would like to understand, should I expect adder/subtractor being
> > called with null aggValue, or is it a clear sign that something went
> > horribly wrong?
> >
> > On Fri, Jul 13, 2018 at 12:19 AM John Roesler <jo...@confluent.io> wrote:
> > >
> > > Hi Vasily,
> > >
> > > Thanks for the email.
> > >
> > > To answer your question: you should reset the application basically any
> > > time you change the topology. Some transitions are safe, but others will
> > > result in data loss or corruption. Rather than try to reason about which
> > is
> > > which, it's much safer just to either reset the app or not change it (if
> > it
> > > has important state).
> > >
> > > Beyond changes that you make to the topology, we spend a lot of effort to
> > > try and make sure that different versions of Streams will produce the
> > same
> > > topology, so unless the release notes say otherwise, you should be able
> > to
> > > upgrade without a reset.
> > >
> > >
> > > I can't say right now whether those wacky behaviors are bugs or the
> > result
> > > of changing the topology without a reset. Or if they are correct but
> > > surprising behavior somehow. I'll look into it tomorrow. Do feel free to
> > > open a Jira ticket if you think you have found a bug, especially if you
> > can
> > > describe a repro. Knowing your topology before and after the change would
> > > also be immensely helpful. You can print it with Topology.describe().
> > >
> > > Regardless, I'll make a note to take a look at the code tomorrow and try
> > to
> > > decide if you should expect these behaviors with "clean" topology
> > changes.
> > >
> > > Thanks,
> > > -John
> > >
> > > On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <va...@sulatskov.net>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am doing some experiments with kafka-streams KGroupedTable
> > > > aggregation, and admittedly I am not wiping data properly on each
> > > > restart, partially because I also wonder what would happen if you
> > > > change a streams topology without doing a proper reset.
> > > >
> > > > I've noticed that from time to time, kafka-streams
> > > > KGroupedTable.reduce() can call subtractor function with null
> > > > aggregator value, and if you try to work around that, by interpreting
> > > > null aggregator value as zero for numeric value you get incorrect
> > > > aggregation result.
> > > >
> > > > I do understand that the proper way of handling this is to do a reset
> > > > on topology changes, but I'd like to understand if there's any
> > > > legitmate case when kafka-streams can call an adder or a substractor
> > > > with null aggregator value, and should I plan for this, or should I
> > > > interpret this as an invalid state, and terminate the application, and
> > > > do a proper reset?
> > > >
> > > > Also, I can't seem to find a guide which explains when application
> > > > reset is necessary. Intuitively it seems that it should be done every
> > > > time a topology changes. Any other cases?
> > > >
> > > > I tried to debug where the null value comes from and it seems that
> > > > KTableReduce.process() is getting called with Change<V> value with
> > > > newValue == null, and some non-null oldValue. Which leads to and to
> > > > subtractor being called with null aggregator value. I wonder how it is
> > > > possible to have an old value for a key without a new value (does it
> > > > happen because of the auto commit interval?).
> > > >
> > > > I've also noticed that it's possible for an input value from a topic
> > > > to bypass aggregation function entirely and be directly transmitted to
> > > > the output in certain cases: oldAgg is null, newValue is not null and
> > > > oldValue is null - in that case newValue will be transmitted directly
> > > > to the output. I suppose it's the correct behaviour, but feels a bit
> > > > weird nonetheless. And I've actually been able to observe this
> > > > behaviour in practice. I suppose it's also caused by this happening
> > > > right before a commit happens, and the message is sent to a changelog
> > > > topic.
> > > >
> > > > Please can someone with more knowledge shed some light on these issues?
> > > >
> > > > --
> > > > Best regards,
> > > > Vasily Sulatskov
> > > >
> >
> >
> >
> > --
> > Best regards,
> > Vasily Sulatskov
> >



-- 
Best regards,
Vasily Sulatskov

Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by John Roesler <jo...@confluent.io>.
Hi again Vasily,

Ok, it looks to me like this behavior is the result of the un-clean
topology change.

Just in case you're interested, here's what I think happened.

1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table" )
internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
forwarding both the old and new value that we are able to maintain
aggregates using the subtractor/adder pairs)

2. In the full topology, these old/new pairs go through some
transformations, but still in some form eventually make their way down to
the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").

3. The reduce processor logic looks like this:
final V oldAgg = store.get(key);
V newAgg = oldAgg;

// first try to add the new value
if (value.newValue != null) {
    if (newAgg == null) {
        newAgg = value.newValue;
    } else {
        newAgg = addReducer.apply(newAgg, value.newValue);
    }
}

// then try to remove the old value
if (value.oldValue != null) {
    // Here's where the assumption breaks down...
    newAgg = removeReducer.apply(newAgg, value.oldValue);
}

4. Here's what I think happened. This processor saw an event like
{new=null, old=(key2, 732, 10:50:40)}. This would skip the first block, and
(since "oldValue != null") would go into the second block. I think that in
the normal case we can rely on the invariant that any value we get as an
"oldValue" is one that we've previously seen ( as "newValue" ).
Consequently, we should be able to assume that if we get a non-null
"oldValue", "newAgg" will also not be null (because we would have written
it to the store back when we saw it as "newValue" and then retrieved it
just now as "newAgg = oldAgg").

However, if subtopology2, along with KTABLE-SELECT-0000000006
and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
"table") has already emitted some values, then we might in fact receive an
event with some "oldValue" that we have in fact never seen before (because (
KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when it
was first emitted as a "newValue").

This would violate our assumption, and we would unintentionally send a
"null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
If you want to double-check my reasoning, you should be able to do so in
the debugger with a breakpoint in KTableReduce.


tl;dr: Supposing you reset the app when the topology changes, I think that
you should be able to rely on non-null aggregates being passed in to *both*
the adder and subtractor in a reduce.

I would be in favor, as you suggested, of adding an explicit check and
throwing an exception if the aggregate is ever null at those points. This
would actually help us detect if the topology has changed unexpectedly and
shut down, hopefully before any damage is done. I'll send a PR and see what
everyone thinks.

Does this all seem like it adds up to you?
-John


On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <va...@sulatskov.net>
wrote:

> Hi John,
>
> Thanks for your reply. I am not sure if this behavior I've observed is
> a bug or not, as I've not been resetting my application properly. On
> the other hand if the subtractor or adder in the reduce operation are
> never supposed to be called with null aggregator value, perhaps it
> would make sense to put a null check in the table reduce
> implementation to detect an application entering an invalid state. A
> bit like a check for topics having the same number of partitions when
> doing a join.
>
> Here's some information about my tests. Hope that can be useful:
>
> Topology at start:
>
> 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>       --> KSTREAM-MAP-0000000001
>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>       --> KSTREAM-FILTER-0000000004
>       <-- KSTREAM-SOURCE-0000000000
>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>       --> KSTREAM-SINK-0000000003
>       <-- KSTREAM-MAP-0000000001
>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>       <-- KSTREAM-FILTER-0000000004
>
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>       --> KSTREAM-REDUCE-0000000002
>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>       --> KTABLE-TOSTREAM-0000000006
>       <-- KSTREAM-SOURCE-0000000005
>     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
>       --> KSTREAM-SINK-0000000007
>       <-- KSTREAM-REDUCE-0000000002
>     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
>       <-- KTABLE-TOSTREAM-0000000006
>
> This topology just takes data from the source topic "slope" which
> produces messages like this:
>
> key1
> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> key3
> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> key2
> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> key3
> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> key1
> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> key2
> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>
> Every second, there are 3 new messages arrive from "slope" topic for
> keys key1, key2 and key3, with constantly increasing value.
> Data is transformed so that the original key is also tracked in the
> message value, grouped by key, and windowed with a custom window, and
> reduced with a dummy reduce operation to make a KTable.
> KTable is converted back to a stream, and sent to a topic (just for
> debugging purposes).
>
> Here's the source (it's kafka-streams-scala for the most part). Also
> please ignore silly classes, it's obviously a test:
>
>     val slopeTable = builder
>       .stream[String, TimedValue]("slope")
>       .map(
>         (key, value) =>
>           (
>             StringWrapper(key),
>             TimedValueWithKey(value = value.value, timestamp =
> value.timestamp, key = key)
>         )
>       )
>       .groupByKey
>       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
>       .reduceMat((aggValue, newValue) => newValue, "table")
>
>     slopeTable.toStream
>       .to("slope-table")
>
> Topology after change without a proper reset:
>
> 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>       --> KSTREAM-MAP-0000000001
>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>       --> KSTREAM-FILTER-0000000004
>       <-- KSTREAM-SOURCE-0000000000
>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>       --> KSTREAM-SINK-0000000003
>       <-- KSTREAM-MAP-0000000001
>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>       <-- KSTREAM-FILTER-0000000004
>
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>       --> KSTREAM-REDUCE-0000000002
>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
>       <-- KSTREAM-SOURCE-0000000005
>     Processor: KTABLE-SELECT-0000000006 (stores: [])
>       --> KSTREAM-SINK-0000000007
>       <-- KSTREAM-REDUCE-0000000002
>     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
>       --> KSTREAM-SINK-0000000013
>       <-- KSTREAM-REDUCE-0000000002
>     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
>       <-- KTABLE-SELECT-0000000006
>     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
>       <-- KTABLE-TOSTREAM-0000000012
>
>   Sub-topology: 2
>     Source: KSTREAM-SOURCE-0000000008 (topics:
> [aggregated-table-repartition])
>       --> KTABLE-REDUCE-0000000009
>     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
>       --> KTABLE-TOSTREAM-0000000010
>       <-- KSTREAM-SOURCE-0000000008
>     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
>       --> KSTREAM-SINK-0000000011
>       <-- KTABLE-REDUCE-0000000009
>     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
>       <-- KTABLE-TOSTREAM-0000000010
>
> Here's the source of the sub-topology that does table aggregation:
>
>     slopeTable
>       .groupBy(
>         (key, value) => (new Windowed(StringWrapper("dummykey"),
> key.window()), value)
>       )
>       .reduceMat(adder = (aggValue, newValue) => {
>         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
>         val agg = Option(aggValue)
>         TimedValueWithKey(
>           value = agg.map(_.value).getOrElse(0) + newValue.value,
>           timestamp =
>
> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> newValue.timestamp),
>           key = "reduced"
>         )
>       }, subtractor = (aggValue, newValue) => {
>         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
>         val agg = Option(aggValue)
>         TimedValueWithKey(
>           value = agg.map(_.value).getOrElse(0) - newValue.value,
>           timestamp =
>
> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> newValue.timestamp),
>           key = "reduced"
>         )
>       }, "aggregated-table")
>       .toStream
>       .to("slope-aggregated-table")
>
> I log all calls to adder and subtractor, so I am able to see what's
> going on there, as well as I track the original keys of the aggregated
> values and their timestamps, so it's relatively easy to see how the
> data goes through this topology
>
> In order to reproduce this behavior I need to:
> 1. Start a full topology (with table aggregation)
> 2. Start without table aggregation (no app reset)
> 3. Start with table aggregation (no app reset)
>
> Bellow is an interpretation of the adder/subtractor logs for a given
> key/window in the chronological order
>
> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
>
> And in the end the last value that's materialized for that window
> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
> for a single key between some point in the middle of the window and at
> the end of the window, times 3. As opposed to the expected value of
> 751 * 3 = 2253 (sum of last values in a time window for all keys being
> aggregated).
>
> It's clear to me that I should do an application reset, but I also
> would like to understand, should I expect adder/subtractor being
> called with null aggValue, or is it a clear sign that something went
> horribly wrong?
>
> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <jo...@confluent.io> wrote:
> >
> > Hi Vasily,
> >
> > Thanks for the email.
> >
> > To answer your question: you should reset the application basically any
> > time you change the topology. Some transitions are safe, but others will
> > result in data loss or corruption. Rather than try to reason about which
> is
> > which, it's much safer just to either reset the app or not change it (if
> it
> > has important state).
> >
> > Beyond changes that you make to the topology, we spend a lot of effort to
> > try and make sure that different versions of Streams will produce the
> same
> > topology, so unless the release notes say otherwise, you should be able
> to
> > upgrade without a reset.
> >
> >
> > I can't say right now whether those wacky behaviors are bugs or the
> result
> > of changing the topology without a reset. Or if they are correct but
> > surprising behavior somehow. I'll look into it tomorrow. Do feel free to
> > open a Jira ticket if you think you have found a bug, especially if you
> can
> > describe a repro. Knowing your topology before and after the change would
> > also be immensely helpful. You can print it with Topology.describe().
> >
> > Regardless, I'll make a note to take a look at the code tomorrow and try
> to
> > decide if you should expect these behaviors with "clean" topology
> changes.
> >
> > Thanks,
> > -John
> >
> > On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <va...@sulatskov.net>
> > wrote:
> >
> > > Hi,
> > >
> > > I am doing some experiments with kafka-streams KGroupedTable
> > > aggregation, and admittedly I am not wiping data properly on each
> > > restart, partially because I also wonder what would happen if you
> > > change a streams topology without doing a proper reset.
> > >
> > > I've noticed that from time to time, kafka-streams
> > > KGroupedTable.reduce() can call subtractor function with null
> > > aggregator value, and if you try to work around that, by interpreting
> > > null aggregator value as zero for numeric value you get incorrect
> > > aggregation result.
> > >
> > > I do understand that the proper way of handling this is to do a reset
> > > on topology changes, but I'd like to understand if there's any
> > > legitmate case when kafka-streams can call an adder or a substractor
> > > with null aggregator value, and should I plan for this, or should I
> > > interpret this as an invalid state, and terminate the application, and
> > > do a proper reset?
> > >
> > > Also, I can't seem to find a guide which explains when application
> > > reset is necessary. Intuitively it seems that it should be done every
> > > time a topology changes. Any other cases?
> > >
> > > I tried to debug where the null value comes from and it seems that
> > > KTableReduce.process() is getting called with Change<V> value with
> > > newValue == null, and some non-null oldValue. Which leads to and to
> > > subtractor being called with null aggregator value. I wonder how it is
> > > possible to have an old value for a key without a new value (does it
> > > happen because of the auto commit interval?).
> > >
> > > I've also noticed that it's possible for an input value from a topic
> > > to bypass aggregation function entirely and be directly transmitted to
> > > the output in certain cases: oldAgg is null, newValue is not null and
> > > oldValue is null - in that case newValue will be transmitted directly
> > > to the output. I suppose it's the correct behaviour, but feels a bit
> > > weird nonetheless. And I've actually been able to observe this
> > > behaviour in practice. I suppose it's also caused by this happening
> > > right before a commit happens, and the message is sent to a changelog
> > > topic.
> > >
> > > Please can someone with more knowledge shed some light on these issues?
> > >
> > > --
> > > Best regards,
> > > Vasily Sulatskov
> > >
>
>
>
> --
> Best regards,
> Vasily Sulatskov
>

Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by Vasily Sulatskov <va...@sulatskov.net>.
Hi John,

Thanks for your reply. I am not sure if this behavior I've observed is
a bug or not, as I've not been resetting my application properly. On
the other hand if the subtractor or adder in the reduce operation are
never supposed to be called with null aggregator value, perhaps it
would make sense to put a null check in the table reduce
implementation to detect an application entering an invalid state. A
bit like a check for topics having the same number of partitions when
doing a join.

Here's some information about my tests. Hope that can be useful:

Topology at start:

2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
      --> KSTREAM-MAP-0000000001
    Processor: KSTREAM-MAP-0000000001 (stores: [])
      --> KSTREAM-FILTER-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FILTER-0000000004 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-MAP-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
      <-- KSTREAM-FILTER-0000000004

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
      --> KSTREAM-REDUCE-0000000002
    Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
      --> KTABLE-TOSTREAM-0000000006
      <-- KSTREAM-SOURCE-0000000005
    Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
      --> KSTREAM-SINK-0000000007
      <-- KSTREAM-REDUCE-0000000002
    Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
      <-- KTABLE-TOSTREAM-0000000006

This topology just takes data from the source topic "slope" which
produces messages like this:

key1    {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
key3    {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
key2    {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
key3    {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
key1    {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
key2    {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}

Every second, there are 3 new messages arrive from "slope" topic for
keys key1, key2 and key3, with constantly increasing value.
Data is transformed so that the original key is also tracked in the
message value, grouped by key, and windowed with a custom window, and
reduced with a dummy reduce operation to make a KTable.
KTable is converted back to a stream, and sent to a topic (just for
debugging purposes).

Here's the source (it's kafka-streams-scala for the most part). Also
please ignore silly classes, it's obviously a test:

    val slopeTable = builder
      .stream[String, TimedValue]("slope")
      .map(
        (key, value) =>
          (
            StringWrapper(key),
            TimedValueWithKey(value = value.value, timestamp =
value.timestamp, key = key)
        )
      )
      .groupByKey
      .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
      .reduceMat((aggValue, newValue) => newValue, "table")

    slopeTable.toStream
      .to("slope-table")

Topology after change without a proper reset:

2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
      --> KSTREAM-MAP-0000000001
    Processor: KSTREAM-MAP-0000000001 (stores: [])
      --> KSTREAM-FILTER-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FILTER-0000000004 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-MAP-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
      <-- KSTREAM-FILTER-0000000004

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
      --> KSTREAM-REDUCE-0000000002
    Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
      --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
      <-- KSTREAM-SOURCE-0000000005
    Processor: KTABLE-SELECT-0000000006 (stores: [])
      --> KSTREAM-SINK-0000000007
      <-- KSTREAM-REDUCE-0000000002
    Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
      --> KSTREAM-SINK-0000000013
      <-- KSTREAM-REDUCE-0000000002
    Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
      <-- KTABLE-SELECT-0000000006
    Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
      <-- KTABLE-TOSTREAM-0000000012

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition])
      --> KTABLE-REDUCE-0000000009
    Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
      --> KTABLE-TOSTREAM-0000000010
      <-- KSTREAM-SOURCE-0000000008
    Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
      --> KSTREAM-SINK-0000000011
      <-- KTABLE-REDUCE-0000000009
    Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
      <-- KTABLE-TOSTREAM-0000000010

Here's the source of the sub-topology that does table aggregation:

    slopeTable
      .groupBy(
        (key, value) => (new Windowed(StringWrapper("dummykey"),
key.window()), value)
      )
      .reduceMat(adder = (aggValue, newValue) => {
        log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
        val agg = Option(aggValue)
        TimedValueWithKey(
          value = agg.map(_.value).getOrElse(0) + newValue.value,
          timestamp =
            Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
newValue.timestamp),
          key = "reduced"
        )
      }, subtractor = (aggValue, newValue) => {
        log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
        val agg = Option(aggValue)
        TimedValueWithKey(
          value = agg.map(_.value).getOrElse(0) - newValue.value,
          timestamp =
            Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
newValue.timestamp),
          key = "reduced"
        )
      }, "aggregated-table")
      .toStream
      .to("slope-aggregated-table")

I log all calls to adder and subtractor, so I am able to see what's
going on there, as well as I track the original keys of the aggregated
values and their timestamps, so it's relatively easy to see how the
data goes through this topology

In order to reproduce this behavior I need to:
1. Start a full topology (with table aggregation)
2. Start without table aggregation (no app reset)
3. Start with table aggregation (no app reset)

Bellow is an interpretation of the adder/subtractor logs for a given
key/window in the chronological order

SUB: newValue=(key2, 732, 10:50:40) aggValue=null
ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)

And in the end the last value that's materialized for that window
(i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
for a single key between some point in the middle of the window and at
the end of the window, times 3. As opposed to the expected value of
751 * 3 = 2253 (sum of last values in a time window for all keys being
aggregated).

It's clear to me that I should do an application reset, but I also
would like to understand, should I expect adder/subtractor being
called with null aggValue, or is it a clear sign that something went
horribly wrong?

On Fri, Jul 13, 2018 at 12:19 AM John Roesler <jo...@confluent.io> wrote:
>
> Hi Vasily,
>
> Thanks for the email.
>
> To answer your question: you should reset the application basically any
> time you change the topology. Some transitions are safe, but others will
> result in data loss or corruption. Rather than try to reason about which is
> which, it's much safer just to either reset the app or not change it (if it
> has important state).
>
> Beyond changes that you make to the topology, we spend a lot of effort to
> try and make sure that different versions of Streams will produce the same
> topology, so unless the release notes say otherwise, you should be able to
> upgrade without a reset.
>
>
> I can't say right now whether those wacky behaviors are bugs or the result
> of changing the topology without a reset. Or if they are correct but
> surprising behavior somehow. I'll look into it tomorrow. Do feel free to
> open a Jira ticket if you think you have found a bug, especially if you can
> describe a repro. Knowing your topology before and after the change would
> also be immensely helpful. You can print it with Topology.describe().
>
> Regardless, I'll make a note to take a look at the code tomorrow and try to
> decide if you should expect these behaviors with "clean" topology changes.
>
> Thanks,
> -John
>
> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <va...@sulatskov.net>
> wrote:
>
> > Hi,
> >
> > I am doing some experiments with kafka-streams KGroupedTable
> > aggregation, and admittedly I am not wiping data properly on each
> > restart, partially because I also wonder what would happen if you
> > change a streams topology without doing a proper reset.
> >
> > I've noticed that from time to time, kafka-streams
> > KGroupedTable.reduce() can call subtractor function with null
> > aggregator value, and if you try to work around that, by interpreting
> > null aggregator value as zero for numeric value you get incorrect
> > aggregation result.
> >
> > I do understand that the proper way of handling this is to do a reset
> > on topology changes, but I'd like to understand if there's any
> > legitmate case when kafka-streams can call an adder or a substractor
> > with null aggregator value, and should I plan for this, or should I
> > interpret this as an invalid state, and terminate the application, and
> > do a proper reset?
> >
> > Also, I can't seem to find a guide which explains when application
> > reset is necessary. Intuitively it seems that it should be done every
> > time a topology changes. Any other cases?
> >
> > I tried to debug where the null value comes from and it seems that
> > KTableReduce.process() is getting called with Change<V> value with
> > newValue == null, and some non-null oldValue. Which leads to and to
> > subtractor being called with null aggregator value. I wonder how it is
> > possible to have an old value for a key without a new value (does it
> > happen because of the auto commit interval?).
> >
> > I've also noticed that it's possible for an input value from a topic
> > to bypass aggregation function entirely and be directly transmitted to
> > the output in certain cases: oldAgg is null, newValue is not null and
> > oldValue is null - in that case newValue will be transmitted directly
> > to the output. I suppose it's the correct behaviour, but feels a bit
> > weird nonetheless. And I've actually been able to observe this
> > behaviour in practice. I suppose it's also caused by this happening
> > right before a commit happens, and the message is sent to a changelog
> > topic.
> >
> > Please can someone with more knowledge shed some light on these issues?
> >
> > --
> > Best regards,
> > Vasily Sulatskov
> >



-- 
Best regards,
Vasily Sulatskov

Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

Posted by John Roesler <jo...@confluent.io>.
Hi Vasily,

Thanks for the email.

To answer your question: you should reset the application basically any
time you change the topology. Some transitions are safe, but others will
result in data loss or corruption. Rather than try to reason about which is
which, it's much safer just to either reset the app or not change it (if it
has important state).

Beyond changes that you make to the topology, we spend a lot of effort to
try and make sure that different versions of Streams will produce the same
topology, so unless the release notes say otherwise, you should be able to
upgrade without a reset.


I can't say right now whether those wacky behaviors are bugs or the result
of changing the topology without a reset. Or if they are correct but
surprising behavior somehow. I'll look into it tomorrow. Do feel free to
open a Jira ticket if you think you have found a bug, especially if you can
describe a repro. Knowing your topology before and after the change would
also be immensely helpful. You can print it with Topology.describe().

Regardless, I'll make a note to take a look at the code tomorrow and try to
decide if you should expect these behaviors with "clean" topology changes.

Thanks,
-John

On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <va...@sulatskov.net>
wrote:

> Hi,
>
> I am doing some experiments with kafka-streams KGroupedTable
> aggregation, and admittedly I am not wiping data properly on each
> restart, partially because I also wonder what would happen if you
> change a streams topology without doing a proper reset.
>
> I've noticed that from time to time, kafka-streams
> KGroupedTable.reduce() can call subtractor function with null
> aggregator value, and if you try to work around that, by interpreting
> null aggregator value as zero for numeric value you get incorrect
> aggregation result.
>
> I do understand that the proper way of handling this is to do a reset
> on topology changes, but I'd like to understand if there's any
> legitmate case when kafka-streams can call an adder or a substractor
> with null aggregator value, and should I plan for this, or should I
> interpret this as an invalid state, and terminate the application, and
> do a proper reset?
>
> Also, I can't seem to find a guide which explains when application
> reset is necessary. Intuitively it seems that it should be done every
> time a topology changes. Any other cases?
>
> I tried to debug where the null value comes from and it seems that
> KTableReduce.process() is getting called with Change<V> value with
> newValue == null, and some non-null oldValue. Which leads to and to
> subtractor being called with null aggregator value. I wonder how it is
> possible to have an old value for a key without a new value (does it
> happen because of the auto commit interval?).
>
> I've also noticed that it's possible for an input value from a topic
> to bypass aggregation function entirely and be directly transmitted to
> the output in certain cases: oldAgg is null, newValue is not null and
> oldValue is null - in that case newValue will be transmitted directly
> to the output. I suppose it's the correct behaviour, but feels a bit
> weird nonetheless. And I've actually been able to observe this
> behaviour in practice. I suppose it's also caused by this happening
> right before a commit happens, and the message is sent to a changelog
> topic.
>
> Please can someone with more knowledge shed some light on these issues?
>
> --
> Best regards,
> Vasily Sulatskov
>