You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by John Roesler <vv...@apache.org> on 2020/03/02 15:32:58 UTC

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Hi Richard,

Thanks for the KIP!

I'm +1 (binding)

-john

On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> Hi all,
> 
> I am proposing a new optimization to Kafka Streams which would greatly
> reduce the number of idempotent updates (or no-ops) in the Kafka Streams
> DAG.
> A number of users have been interested in this feature, so it would be nice
> to pass this one in.
> 
> For information, the KIP is described below:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> 
> We aim to make Kafka Streams more efficient by adopting the "emit on
> change" reporting strategy.
> 
> Please cast your vote!
> 
> Best,
> Richard
>

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Randall Hauch <rh...@gmail.com>.
Sorry, I missed Matthias' +1 binding. I'll move the KIP back to "Adopted"
and add it to the AK 2.6.0.

Apologies for the noise.

On Tue, May 26, 2020 at 12:14 PM Randall Hauch <rh...@gmail.com> wrote:

> Just a quick note: I've changed
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams to
> denote that this KIP is still in voting, as it has only received 2 binding
> votes. I will also remove the KIP from the AK 2.6.0 release, since the KIP
> freeze (May 20) has already passed, meaning even with an additional binding
> vote this KIP still would not make the AK 2.6.0 deadline.
>
> Best regards,
>
> Randall
>
> On Sat, Mar 7, 2020 at 4:53 PM Richard Yu <yo...@gmail.com>
> wrote:
>
>> Hi Matthias,
>>
>> Oh, I see. Next time, I will take that into account.
>> It looked like at the time there wasn't much contention over the major
>> points of the proposal, so I thought I could pass it.
>>
>> I will also make some last modifications to the KIP.
>>
>> Thanks for your vote!
>>
>> Best,
>> Richard
>>
>>
>> On Sat, Mar 7, 2020 at 1:00 PM Matthias J. Sax <mj...@apache.org> wrote:
>>
>> > -----BEGIN PGP SIGNED MESSAGE-----
>> > Hash: SHA512
>> >
>> > Richard,
>> >
>> > you cannot close a KIP as accepted with 2 binging votes. (cf
>> > https://cwiki.apache.org/confluence/display/KAFKA/Bylaws)
>> >
>> > You could only discard the KIP as long as it's not accepted :D
>> >
>> > However, I am +1 (binding) and thus you can close the VOTE as accepted.
>> >
>> >
>> > Just a three minor follow up comments:
>> >
>> > (1) In "Reporting Strategies" you mention in point (2) "Emit on update
>> > / non-empty content" -- I am not sure what "empty content" would be.
>> > This is a little bit confusing. Maybe just remove it?
>> >
>> >
>> > (2) "Design Reasoning"
>> >
>> > > we have decided that we will forward aggregation results if and
>> > > only if the timestamp and the value had not changed
>> >
>> > This sounds incorrect. If both value and timestamp have not changed,
>> > we would skip the update from my understanding?
>> >
>> > Ie, to phrase is differently: for a table-operation we only consider
>> > the value to make a comparison and if the value does not change, we
>> > don't emit anything (even if the timestamp changed).
>> >
>> > For windowed aggregations however, even if the value does not change,
>> > but the timestamp advances, we emit, ie, a changing timestamp is not
>> > considered idempotent for this case. (Note, that the timestamp can
>> > never go backward for this case, because it's computed as maximum over
>> > all input record for the window).
>> >
>> >
>> > (3) The discussion about stream time is very interesting. I agree that
>> > it's an orthogonal concern to this KIP.
>> >
>> >
>> >
>> > - -Matthias
>> >
>> >
>> > On 3/6/20 1:52 PM, Richard Yu wrote:
>> > > Hi all,
>> > >
>> > > I have decided to pass this KIP with 2 binding votes and 3
>> > > non-binding votes (including mine). I will update KIP status
>> > > shortly after this.
>> > >
>> > > Best, Richard
>> > >
>> > > On Thu, Mar 5, 2020 at 3:45 PM Richard Yu
>> > > <yo...@gmail.com> wrote:
>> > >
>> > >> Hi all,
>> > >>
>> > >> Just polling for some last changes on the name. I think that
>> > >> since there doesn't seem to be much objection to any major
>> > >> changes in the KIP, I will pass it this Friday.
>> > >>
>> > >> If you feel that we still need some more discussion, please let
>> > >> me know. :)
>> > >>
>> > >> Best, Richard
>> > >>
>> > >> P.S. Will start working on a PR for this one soon.
>> > >>
>> > >> On Wed, Mar 4, 2020 at 1:30 PM Guozhang Wang <wa...@gmail.com>
>> > >> wrote:
>> > >>
>> > >>> Regarding the metric name, I was actually trying to be
>> > >>> consistent with the node-level `suppression-emit` as I feel
>> > >>> this one's characteristics is closer to that. I other folks
>> > >>> feels better to align with the task-level "dropped-records" I
>> > >>> think I can be convinced too.
>> > >>>
>> > >>>
>> > >>> Guozhang
>> > >>>
>> > >>> On Wed, Mar 4, 2020 at 12:09 AM Bruno Cadonna
>> > >>> <br...@confluent.io> wrote:
>> > >>>
>> > >>>> Hi all,
>> > >>>>
>> > >>>> may I make a non-binding proposal for the metric name? I
>> > >>>> would prefer "skipped-idempotent-updates" to be consistent
>> > >>>> with the "dropped-records".
>> > >>>>
>> > >>>> Best, Bruno
>> > >>>>
>> > >>>> On Tue, Mar 3, 2020 at 11:57 PM Richard Yu
>> > >>>> <yo...@gmail.com> wrote:
>> > >>>>>
>> > >>>>> Hi all,
>> > >>>>>
>> > >>>>> Thanks for the discussion!
>> > >>>>>
>> > >>>>> @Guozhang, I will make the corresponding changes to the KIP
>> > >>>>> (i.e.
>> > >>>> renaming
>> > >>>>> the sensor and adding some notes). With the current state
>> > >>>>> of things, we are very close. Just need that
>> > >>> one
>> > >>>>> last binding vote.
>> > >>>>>
>> > >>>>> @Matthias J. Sax <ma...@confluent.io>  It would be ideal
>> > >>>>> if we can
>> > >>>> also
>> > >>>>> get your last two cents on this as well. Other than that,
>> > >>>>> we are good.
>> > >>>>>
>> > >>>>> Best, Richard
>> > >>>>>
>> > >>>>>
>> > >>>>> On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang
>> > >>>>> <wa...@gmail.com>
>> > >>>> wrote:
>> > >>>>>
>> > >>>>>> Hi Bruno, John:
>> > >>>>>>
>> > >>>>>> 1) That makes sense. If we consider them to be
>> > >>>>>> node-specific metrics
>> > >>>> that
>> > >>>>>> only applies to a subset of built-in processor nodes that
>> > >>>>>> are
>> > >>>> irrelevant to
>> > >>>>>> alert-relevant metrics (just like suppression-emit (rate
>> > >>>>>> | total)),
>> > >>>> they'd
>> > >>>>>> better be per-node instead of per-task and we would not
>> > >>>>>> associate
>> > >>> such
>> > >>>>>> events with warning. With that in mind, I'd suggest we
>> > >>>>>> consider
>> > >>>> renaming
>> > >>>>>> the metric without the `dropped` keyword to distinguish
>> > >>>>>> it with the per-task level sensor. How about
>> > >>>>>> "idempotent-update-skip (rate |
>> > >>>> total)"?
>> > >>>>>>
>> > >>>>>> Also a minor suggestion: we should clarify in the KIP /
>> > >>>>>> javadocs
>> > >>> which
>> > >>>>>> built-in processor nodes would have this metric while
>> > >>>>>> others don't.
>> > >>>>>>
>> > >>>>>> 2) About stream time tracking, there are multiple known
>> > >>>>>> issues that
>> > >>> we
>> > >>>>>> should close to improve our consistency semantics:
>> > >>>>>>
>> > >>>>>> a. preserve stream time of active tasks across rebalances
>> > >>>>>> where
>> > >>> they
>> > >>>> may
>> > >>>>>> be migrated. This is what KAFKA-9368
>> > >>>>>> <https://issues.apache.org/jira/browse/KAFKA-9368> meant
>> > >>>>>> for. b. preserve stream time of standby tasks to be
>> > >>>>>> aligned with the
>> > >>> active
>> > >>>>>> tasks, via the changelog topics.
>> > >>>>>>
>> > >>>>>> And what I'm more concerning is b) here. For example:
>> > >>>>>> let's say we
>> > >>>> have a
>> > >>>>>> topology of `source -> A -> repartition -> B` where both
>> > >>>>>> A and B
>> > >>> have
>> > >>>>>> states along with changelogs, and both of them have
>> > >>>>>> standbys. If a
>> > >>>> record
>> > >>>>>> is piped from the source and completed traversed through
>> > >>>>>> the
>> > >>> topology,
>> > >>>> we
>> > >>>>>> need to make sure that the stream time inferred across:
>> > >>>>>>
>> > >>>>>> * active task A (inferred from the source record), *
>> > >>>>>> active task B (inferred from the derived record from
>> > >>>>>> repartition
>> > >>>> topic),
>> > >>>>>> * standby task A (inferred from the changelog topic of
>> > >>>>>> A's store), * standby task B (inferred from the changelog
>> > >>>>>> topic of B's store)
>> > >>>>>>
>> > >>>>>> are consistent (note I'm not saying they should be
>> > >>>>>> "exactly the
>> > >>> same",
>> > >>>> but
>> > >>>>>> consistent, meaning that they may have different values
>> > >>>>>> but as long
>> > >>> as
>> > >>>> that
>> > >>>>>> does not impact the time-based queries, it is fine). The
>> > >>>>>> main
>> > >>>> motivation is
>> > >>>>>> that on IQ, where both active and standby tasks could be
>> > >>>>>> accessed,
>> > >>> we
>> > >>>> can
>> > >>>>>> eventually improve our consistency guarantee to have 1)
>> > >>>> read-your-write, 2)
>> > >>>>>> consistency across stores, etc.
>> > >>>>>>
>> > >>>>>> I agree with John's assessment in the previous email, and
>> > >>>>>> just to
>> > >>>> clarify
>> > >>>>>> more concretely what I'm thinking.
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> Guozhang
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> On Tue, Mar 3, 2020 at 9:03 AM John Roesler
>> > >>>>>> <vv...@apache.org>
>> > >>>> wrote:
>> > >>>>>>
>> > >>>>>>> Thanks, Guozhang and Bruno!
>> > >>>>>>>
>> > >>>>>>> 2) I had a similar though to both of you about the
>> > >>>>>>> metrics, but I
>> > >>>> ultimately
>> > >>>>>>> came out with a conclusion like Bruno's. These aren't
>> > >>>>>>> dropped
>> > >>> invalid
>> > >>>>>>> records, they're intentionally dropped, valid, but
>> > >>>>>>> unnecessary,
>> > >>>> updates.
>> > >>>>>>> A "warning" for this case definitely seems wrong, and
>> > >>>>>>> I'd also not recommend counting these events along with
>> > >>>>>>> "dropped-records", because those
>> > >>> are
>> > >>>>>>> all dropped invalid records, e.g., late or null-keyed
>> > >>>>>>> or couldn't
>> > >>> be
>> > >>>>>>> deserialized.
>> > >>>>>>>
>> > >>>>>>> Like Bruno pointed out, an operator should be concerned
>> > >>>>>>> to see non-zero "dropped-records", and would then
>> > >>>>>>> consult the logs for
>> > >>>> warnings.
>> > >>>>>>> But that same person should be happy to see
>> > >>>> "dropped-idempotent-updates"
>> > >>>>>>> increasing, since it means they're saving time and
>> > >>>>>>> money. Maybe
>> > >>> the
>> > >>>> name
>> > >>>>>>> of the metric could be different, but I couldn't think
>> > >>>>>>> of a better
>> > >>>> one.
>> > >>>>>>> OTOH, maybe it just stands out to us because we
>> > >>>>>>> recently discussed those
>> > >>>> other
>> > >>>>>>> metrics in KIP-444?
>> > >>>>>>>
>> > >>>>>>> 1) Maybe we should discuss this point more. It seems
>> > >>>>>>> like we should
>> > >>>> maintain
>> > >>>>>>> an invariant that the following three objects always
>> > >>>>>>> have exactly
>> > >>> the
>> > >>>>>> same
>> > >>>>>>> state (modulo flush boundaries): 1. The internal state
>> > >>>>>>> store 2. The changelog 3. The operation's result view
>> > >>>>>>>
>> > >>>>>>> That is, if I have a materialized Filter, then it seems
>> > >>>>>>> like I
>> > >>> _must_
>> > >>>>>> store
>> > >>>>>>> exactly the same record in the store and the changelog,
>> > >>>>>>> and also
>> > >>>> forward
>> > >>>>>>> the exact same record, including the timestamp, to the
>> > >>>>>>> downstream operations.
>> > >>>>>>>
>> > >>>>>>> If we store something different in the internal state
>> > >>>>>>> store than
>> > >>> the
>> > >>>>>>> changelog, we can get a situation where the state is
>> > >>>>>>> actually
>> > >>>> different
>> > >>>>>>> after restoration than it is during processing, and
>> > >>>>>>> queries against
>> > >>>> standbys
>> > >>>>>>> would return different results than queries against the
>> > >>>>>>> active tasks.
>> > >>>>>>>
>> > >>>>>>> Regarding storing something different in the
>> > >>>>>>> store+changelog than
>> > >>> we
>> > >>>>>>> forward downstream, consider the following topology:
>> > >>>>>>> sourceTable .filter(someFilter, Materialized.as("f1"))
>> > >>>>>>> .filter(_ -> true, Materialized.as("f2"))
>> > >>>>>>>
>> > >>>>>>> If we didn't forward exactly the same data we store,
>> > >>>>>>> then
>> > >>> querying f2
>> > >>>>>>> would return different results than querying f1, which
>> > >>>>>>> is clearly
>> > >>> not
>> > >>>>>>> correct, given the topology.
>> > >>>>>>>
>> > >>>>>>> It seems like maybe what you have in mind is the
>> > >>>>>>> preservation of
>> > >>>> stream
>> > >>>>>>> time across restart/rebalance? This bug is still open,
>> > >>>>>>> actually:
>> > >>>>>>> https://issues.apache.org/jira/browse/KAFKA-9368 It
>> > >>>>>>> seems like solving that bug would be independent of
>> > >>>>>>> KIP-557.
>> > >>> I.e.,
>> > >>>>>>> KIP-557 neither makes that bug worse or better.
>> > >>>>>>>
>> > >>>>>>> One other thought I had is maybe you were thinking that
>> > >>>>>>> operators would update their internally tracked stream
>> > >>>>>>> time, but still
>> > >>> discard
>> > >>>>>>> records? I think that _would_ be a bug. That is, if a
>> > >>>>>>> record gets
>> > >>>>>> discarded
>> > >>>>>>> as idempotent, it should have no effect at all on the
>> > >>>>>>> state of the application. Reflecting on my prior
>> > >>>>>>> analysis of stream time, most of the cases
>> > >>>> where
>> > >>>>>> we
>> > >>>>>>> track stream time is in Stream aggregations, and in
>> > >>>>>>> those cases,
>> > >>> if
>> > >>>> an
>> > >>>>>>> incoming record's timestamp is higher than the previous
>> > >>>>>>> stream
>> > >>> time,
>> > >>>> it
>> > >>>>>>> would already not be considered idempotent. So we would
>> > >>>>>>> store,
>> > >>> log,
>> > >>>> and
>> > >>>>>>> forward the result with the new timestamp. The only
>> > >>>>>>> other case is Suppress. With respect to idempotence,
>> > >>>> Suppress is
>> > >>>>>>> equivalent to a stateless no-op transformation. All it
>> > >>>>>>> does is
>> > >>>> collect
>> > >>>>>> and
>> > >>>>>>> delay updates. It has no memory of what it previously
>> > >>>>>>> emitted, so it
>> > >>>> wouldn't
>> > >>>>>>> be possible for it to check for idempotence anyway.
>> > >>>>>>>
>> > >>>>>>> Was that what you were thinking? Thanks, -John
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>> On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
>> > >>>>>>>> Hi Guozhang,
>> > >>>>>>>>
>> > >>>>>>>> I also had the same thought about using the existing
>> > >>>> "dropped-records"
>> > >>>>>>>> metrics. However, I think in this case it would be
>> > >>>>>>>> better to
>> > >>> use a
>> > >>>> new
>> > >>>>>>>> metric because dropped idempotent updates is an
>> > >>>>>>>> optimization,
>> > >>> they
>> > >>>> do
>> > >>>>>>>> not represent missed records. The dropped idempotent
>> > >>>>>>>> updates in general do not change the result and so do
>> > >>>>>>>> not need a warn log message. Whereas dropped records
>> > >>>>>>>> due to expired windows,
>> > >>>> serialization
>> > >>>>>>>> errors, or lateness might be something concerning
>> > >>>>>>>> that need a
>> > >>> warn
>> > >>>> log
>> > >>>>>>>> message.
>> > >>>>>>>>
>> > >>>>>>>> Looking at the metrics, you would be happy to see
>> > >>>>>>>> "dropped-idempotent-updates" increase, because that
>> > >>>>>>>> means
>> > >>> Streams
>> > >>>> gets
>> > >>>>>>>> rid of no-ops downstream, but you would be concerned
>> > >>>>>>>> if "dropped-records" would increase, because that
>> > >>>>>>>> means your
>> > >>> records
>> > >>>> or
>> > >>>>>>>> the configuration of your app has issues. The
>> > >>>>>>>> "dropped-idempotent-updates" metric could also be an
>> > >>>>>>>> indication
>> > >>>> that
>> > >>>>>>>> you could further optimize your setup, by getting rid
>> > >>>>>>>> of
>> > >>> idempotent
>> > >>>>>>>> updates further upstream.
>> > >>>>>>>>
>> > >>>>>>>> Best, Bruno
>> > >>>>>>>>
>> > >>>>>>>> On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <
>> > >>> wangguoz@gmail.com>
>> > >>>>>> wrote:
>> > >>>>>>>>>
>> > >>>>>>>>> Hello Richard,
>> > >>>>>>>>>
>> > >>>>>>>>> Thanks for the KIP. I once reviewed it and was
>> > >>>>>>>>> concerned about
>> > >>>> its
>> > >>>>>>> effects
>> > >>>>>>>>> on stream time advancing. After reading the updated
>> > >>>>>>>>> KIP I
>> > >>> think
>> > >>>> it
>> > >>>>>> has
>> > >>>>>>>>> answered a lot of them already.
>> > >>>>>>>>>
>> > >>>>>>>>> I have a couple minor comments still, otherwise I'm
>> > >>>>>>>>> +1:
>> > >>>>>>>>>
>> > >>>>>>>>> 1) I want to clarify that for operations resulted
>> > >>>>>>>>> in KTables
>> > >>> (not
>> > >>>>>> only
>> > >>>>>>>>> aggregations, but consider KTable#filter that may
>> > >>>>>>>>> also result
>> > >>> in
>> > >>>> a
>> > >>>>>> new
>> > >>>>>>>>> KTable), even if we drop emissions to the
>> > >>>>>>>>> downstream topics we
>> > >>>> would
>> > >>>>>>> still
>> > >>>>>>>>> append to the corresponding changelog if timestamp
>> > >>>>>>>>> has
>> > >>> changed.
>> > >>>> This
>> > >>>>>> is
>> > >>>>>>>>> because the timestamps on the changelog is read by
>> > >>>>>>>>> the standby
>> > >>>> tasks
>> > >>>>>>> which
>> > >>>>>>>>> relies on them to infer its own stream time
>> > >>>>>>>>> advancing.
>> > >>>>>>>>>
>> > >>>>>>>>> 2) About the metrics, in KIP-444 we are
>> > >>>>>>>>> consolidating all
>> > >>> types
>> > >>>> of
>> > >>>>>>>>> scenarios that can cause dropped records to the
>> > >>>>>>>>> same metrics:
>> > >>>>>>>>>
>> > >>>>>>>
>> > >>>>>>
>> > >>>>
>> > >>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi
>> > t+on+change+support+for+Kafka+Streams
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>
>> > late-records-drop: INFO at processor node level, replaced by
>> > >>> INFO
>> > >>>>>>>>> task-level "dropped-records".
>> > >>>>>>>>>
>> > >>>>>>>>> skipped-records: INFO at thread and processor node
>> > >>>>>>>>> level,
>> > >>>> replaced by
>> > >>>>>>> INFO
>> > >>>>>>>>> task-level "dropped-records".
>> > >>>>>>>>>
>> > >>>>>>>>> expired-window-record-drop: DEBUG at state store
>> > >>>>>>>>> level,
>> > >>> replaced
>> > >>>> by
>> > >>>>>>> INFO
>> > >>>>>>>>> task-level "dropped-records".
>> > >>>>>>>>>
>> > >>>>>>>>> The main idea is that instead of using different
>> > >>>>>>>>> metrics to
>> > >>>> indicate
>> > >>>>>>>>> different types of scenarios, and users just alert
>> > >>>>>>>>> on that
>> > >>> single
>> > >>>>>>> metrics.
>> > >>>>>>>>> When alert triggers, they can look into the log4j
>> > >>>>>>>>> for its
>> > >>> causes
>> > >>>> (we
>> > >>>>>>> made
>> > >>>>>>>>> sure that all sensor recordings of this metric
>> > >>>>>>>>> would be
>> > >>>> associated
>> > >>>>>>> with a
>> > >>>>>>>>> warning log4j).
>> > >>>>>>>>>
>> > >>>>>>>>> So I'd suggest that instead of introducing a new
>> > >>>>>>>>> per-node "dropped-idempotent-updates", we just
>> > >>>>>>>>> piggy-back on the
>> > >>> existing
>> > >>>>>>> task-level
>> > >>>>>>>>> metric; unless we think that idempotent drops are
>> > >>>>>>>>> more
>> > >>> frequent
>> > >>>> than
>> > >>>>>>> others
>> > >>>>>>>>> and also they do not worth a warning log, in that
>> > >>>>>>>>> case we can
>> > >>>>>> consider
>> > >>>>>>>>> break this metric down with different tags for
>> > >>>>>>>>> example.
>> > >>>>>>>>>
>> > >>>>>>>>> Guozhang
>> > >>>>>>>>>
>> > >>>>>>>>> On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <
>> > >>>>>> yohan.richard.yu@gmail.com>
>> > >>>>>>>>> wrote:
>> > >>>>>>>>>
>> > >>>>>>>>>> Hi all,
>> > >>>>>>>>>>
>> > >>>>>>>>>> Thanks for the votes so far! @Matthias or
>> > >>>>>>>>>> @Guozhang Wang <gu...@confluent.io> it
>> > >>> would
>> > >>>> be
>> > >>>>>>> great to
>> > >>>>>>>>>> also get your input on this KIP.
>> > >>>>>>>>>>
>> > >>>>>>>>>> It looks to be pretty close to completion, so the
>> > >>>>>>>>>> finishing
>> > >>>> touches
>> > >>>>>>> are all
>> > >>>>>>>>>> we need. :)
>> > >>>>>>>>>>
>> > >>>>>>>>>> Best, Richard
>> > >>>>>>>>>>
>> > >>>>>>>>>> On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine
>> > >>>>>>>>>> < Ghassan.Yammine@bazaarvoice.com> wrote:
>> > >>>>>>>>>>
>> > >>>>>>>>>>> Hello all,
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> +1 (non-binding)
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Thanks,
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Ghassan
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> On 3/2/20, 12:43 PM, "Bruno Cadonna"
>> > >>>>>>>>>>> <bruno@confluent.io
>> > >>>>
>> > >>>>>> wrote:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> EXTERNAL: This email originated from outside
>> > >>>>>>>>>>> of
>> > >>>> Bazaarvoice.
>> > >>>>>>> Do not
>> > >>>>>>>>>>> click any links or open any attachments unless
>> > >>>>>>>>>>> you trust
>> > >>> the
>> > >>>>>>> sender and
>> > >>>>>>>>>>> know the content is safe.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Hi Richard,
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> +1 (non-binding)
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Best, Bruno
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
>> > >>>>>>> vvcephei@apache.org>
>> > >>>>>>>>>>> wrote:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Hi Richard,
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Thanks for the KIP!
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> I'm +1 (binding)
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> -john
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> On Thu, Feb 27, 2020, at 14:40, Richard Yu
>> > >>>>>>>>>>>> wrote:
>> > >>>>>>>>>>>>> Hi all,
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> I am proposing a new optimization to Kafka
>> > >>>>>>>>>>>>> Streams
>> > >>>> which
>> > >>>>>>> would
>> > >>>>>>>>>>> greatly
>> > >>>>>>>>>>>>> reduce the number of idempotent updates
>> > >>>>>>>>>>>>> (or
>> > >>> no-ops)
>> > >>>> in
>> > >>>>>> the
>> > >>>>>>> Kafka
>> > >>>>>>>>>>> Streams
>> > >>>>>>>>>>>>> DAG. A number of users have been interested
>> > >>>>>>>>>>>>> in this
>> > >>>> feature,
>> > >>>>>> so
>> > >>>>>>> it
>> > >>>>>>>>>>> would be nice
>> > >>>>>>>>>>>>> to pass this one in.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> For information, the KIP is described
>> > >>>>>>>>>>>>> below:
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>
>> > >>>>>>
>> > >>>>
>> > >>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi
>> > t+on+change+support+for+Kafka+Streams
>> > >>>>>>>>>>>
>> > >>>
>> > >>
>> > >>>>>>>>>>>>> We aim to make Kafka Streams more efficient
>> > >>>>>>>>>>>>> by
>> > >>>> adopting
>> > >>>>>>> the "emit
>> > >>>>>>>>>>> on
>> > >>>>>>>>>>>>> change" reporting strategy.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Please cast your vote!
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Best, Richard
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> -- -- Guozhang
>> > >>>>>>>>
>> > >>>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> -- -- Guozhang
>> > >>>>>>
>> > >>>>
>> > >>>
>> > >>>
>> > >>> -- -- Guozhang
>> > >>>
>> > >>
>> > >
>> > -----BEGIN PGP SIGNATURE-----
>> >
>> > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kC1kACgkQO4miYXKq
>> > /OhHKA/+OewqjX248vjk6GO6Ex/f2kOJuIIDGb4/c0NlTIS/Iyat1+S8N9P58KNP
>> > pg133xwdWHagU7wajYMktoFiPamQ+Cv+PPhr7qz38JdfVAvzpNb8tcsI/wr5apOQ
>> > XNlBsPhQBLtO/JQUve72OqY/TC9unbpBfhA4tvdA/qkLNvDaX542SrZdlwXuqTKH
>> > EBpgEPBrwaqJ5S65KTMs6Fppc5c2V3dWOAC7Ssql30OneUd/RS88oQ07oNkwZwss
>> > tADw+tzXtw8a0C0PGtMoXhLrs9wipEsuGOP8N6uvuQCM7YoIvTyeBf3Cu7jG8NFB
>> > r2caoWY4TZkqCRsrKe37nNbR8KpjkNQBxCZ7nvIJ9B3KCdB0JOFQXwYj1+23z6aX
>> > T1otQ+0ZIg5lzpIFiHCzwzO5mo2VUEYryRvanw/f2S/LaaBIcg83Dz5TJIv8dFcd
>> > mU7Vu1KXtpWTgpg48JkWd9qSwPqBaR+nvbdP/DnStwf9/9n5SSGgcdS83jw/w6RV
>> > N1bX6YlDCFYeIIT14lrsbWiHSZpiFARZ0fn+VBm8DAF0g+mWlX5Hg30yHKujDj+h
>> > qMDZkI2K2eoYRJaUFcS3yvr2RqCtgXMCEr+jrAVGHDaq+Lt4mbEJRZdon3MiF0Ht
>> > WmEiNaQa7Tu5h+8P5Rb05kPAB6ODa7/sC0BxC54uRXLdPnNxQCs=
>> > =nuG1
>> > -----END PGP SIGNATURE-----
>> >
>>
>

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Randall Hauch <rh...@gmail.com>.
Just a quick note: I've changed
https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
to
denote that this KIP is still in voting, as it has only received 2 binding
votes. I will also remove the KIP from the AK 2.6.0 release, since the KIP
freeze (May 20) has already passed, meaning even with an additional binding
vote this KIP still would not make the AK 2.6.0 deadline.

Best regards,

Randall

On Sat, Mar 7, 2020 at 4:53 PM Richard Yu <yo...@gmail.com>
wrote:

> Hi Matthias,
>
> Oh, I see. Next time, I will take that into account.
> It looked like at the time there wasn't much contention over the major
> points of the proposal, so I thought I could pass it.
>
> I will also make some last modifications to the KIP.
>
> Thanks for your vote!
>
> Best,
> Richard
>
>
> On Sat, Mar 7, 2020 at 1:00 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA512
> >
> > Richard,
> >
> > you cannot close a KIP as accepted with 2 binging votes. (cf
> > https://cwiki.apache.org/confluence/display/KAFKA/Bylaws)
> >
> > You could only discard the KIP as long as it's not accepted :D
> >
> > However, I am +1 (binding) and thus you can close the VOTE as accepted.
> >
> >
> > Just a three minor follow up comments:
> >
> > (1) In "Reporting Strategies" you mention in point (2) "Emit on update
> > / non-empty content" -- I am not sure what "empty content" would be.
> > This is a little bit confusing. Maybe just remove it?
> >
> >
> > (2) "Design Reasoning"
> >
> > > we have decided that we will forward aggregation results if and
> > > only if the timestamp and the value had not changed
> >
> > This sounds incorrect. If both value and timestamp have not changed,
> > we would skip the update from my understanding?
> >
> > Ie, to phrase is differently: for a table-operation we only consider
> > the value to make a comparison and if the value does not change, we
> > don't emit anything (even if the timestamp changed).
> >
> > For windowed aggregations however, even if the value does not change,
> > but the timestamp advances, we emit, ie, a changing timestamp is not
> > considered idempotent for this case. (Note, that the timestamp can
> > never go backward for this case, because it's computed as maximum over
> > all input record for the window).
> >
> >
> > (3) The discussion about stream time is very interesting. I agree that
> > it's an orthogonal concern to this KIP.
> >
> >
> >
> > - -Matthias
> >
> >
> > On 3/6/20 1:52 PM, Richard Yu wrote:
> > > Hi all,
> > >
> > > I have decided to pass this KIP with 2 binding votes and 3
> > > non-binding votes (including mine). I will update KIP status
> > > shortly after this.
> > >
> > > Best, Richard
> > >
> > > On Thu, Mar 5, 2020 at 3:45 PM Richard Yu
> > > <yo...@gmail.com> wrote:
> > >
> > >> Hi all,
> > >>
> > >> Just polling for some last changes on the name. I think that
> > >> since there doesn't seem to be much objection to any major
> > >> changes in the KIP, I will pass it this Friday.
> > >>
> > >> If you feel that we still need some more discussion, please let
> > >> me know. :)
> > >>
> > >> Best, Richard
> > >>
> > >> P.S. Will start working on a PR for this one soon.
> > >>
> > >> On Wed, Mar 4, 2020 at 1:30 PM Guozhang Wang <wa...@gmail.com>
> > >> wrote:
> > >>
> > >>> Regarding the metric name, I was actually trying to be
> > >>> consistent with the node-level `suppression-emit` as I feel
> > >>> this one's characteristics is closer to that. I other folks
> > >>> feels better to align with the task-level "dropped-records" I
> > >>> think I can be convinced too.
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>> On Wed, Mar 4, 2020 at 12:09 AM Bruno Cadonna
> > >>> <br...@confluent.io> wrote:
> > >>>
> > >>>> Hi all,
> > >>>>
> > >>>> may I make a non-binding proposal for the metric name? I
> > >>>> would prefer "skipped-idempotent-updates" to be consistent
> > >>>> with the "dropped-records".
> > >>>>
> > >>>> Best, Bruno
> > >>>>
> > >>>> On Tue, Mar 3, 2020 at 11:57 PM Richard Yu
> > >>>> <yo...@gmail.com> wrote:
> > >>>>>
> > >>>>> Hi all,
> > >>>>>
> > >>>>> Thanks for the discussion!
> > >>>>>
> > >>>>> @Guozhang, I will make the corresponding changes to the KIP
> > >>>>> (i.e.
> > >>>> renaming
> > >>>>> the sensor and adding some notes). With the current state
> > >>>>> of things, we are very close. Just need that
> > >>> one
> > >>>>> last binding vote.
> > >>>>>
> > >>>>> @Matthias J. Sax <ma...@confluent.io>  It would be ideal
> > >>>>> if we can
> > >>>> also
> > >>>>> get your last two cents on this as well. Other than that,
> > >>>>> we are good.
> > >>>>>
> > >>>>> Best, Richard
> > >>>>>
> > >>>>>
> > >>>>> On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang
> > >>>>> <wa...@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>>> Hi Bruno, John:
> > >>>>>>
> > >>>>>> 1) That makes sense. If we consider them to be
> > >>>>>> node-specific metrics
> > >>>> that
> > >>>>>> only applies to a subset of built-in processor nodes that
> > >>>>>> are
> > >>>> irrelevant to
> > >>>>>> alert-relevant metrics (just like suppression-emit (rate
> > >>>>>> | total)),
> > >>>> they'd
> > >>>>>> better be per-node instead of per-task and we would not
> > >>>>>> associate
> > >>> such
> > >>>>>> events with warning. With that in mind, I'd suggest we
> > >>>>>> consider
> > >>>> renaming
> > >>>>>> the metric without the `dropped` keyword to distinguish
> > >>>>>> it with the per-task level sensor. How about
> > >>>>>> "idempotent-update-skip (rate |
> > >>>> total)"?
> > >>>>>>
> > >>>>>> Also a minor suggestion: we should clarify in the KIP /
> > >>>>>> javadocs
> > >>> which
> > >>>>>> built-in processor nodes would have this metric while
> > >>>>>> others don't.
> > >>>>>>
> > >>>>>> 2) About stream time tracking, there are multiple known
> > >>>>>> issues that
> > >>> we
> > >>>>>> should close to improve our consistency semantics:
> > >>>>>>
> > >>>>>> a. preserve stream time of active tasks across rebalances
> > >>>>>> where
> > >>> they
> > >>>> may
> > >>>>>> be migrated. This is what KAFKA-9368
> > >>>>>> <https://issues.apache.org/jira/browse/KAFKA-9368> meant
> > >>>>>> for. b. preserve stream time of standby tasks to be
> > >>>>>> aligned with the
> > >>> active
> > >>>>>> tasks, via the changelog topics.
> > >>>>>>
> > >>>>>> And what I'm more concerning is b) here. For example:
> > >>>>>> let's say we
> > >>>> have a
> > >>>>>> topology of `source -> A -> repartition -> B` where both
> > >>>>>> A and B
> > >>> have
> > >>>>>> states along with changelogs, and both of them have
> > >>>>>> standbys. If a
> > >>>> record
> > >>>>>> is piped from the source and completed traversed through
> > >>>>>> the
> > >>> topology,
> > >>>> we
> > >>>>>> need to make sure that the stream time inferred across:
> > >>>>>>
> > >>>>>> * active task A (inferred from the source record), *
> > >>>>>> active task B (inferred from the derived record from
> > >>>>>> repartition
> > >>>> topic),
> > >>>>>> * standby task A (inferred from the changelog topic of
> > >>>>>> A's store), * standby task B (inferred from the changelog
> > >>>>>> topic of B's store)
> > >>>>>>
> > >>>>>> are consistent (note I'm not saying they should be
> > >>>>>> "exactly the
> > >>> same",
> > >>>> but
> > >>>>>> consistent, meaning that they may have different values
> > >>>>>> but as long
> > >>> as
> > >>>> that
> > >>>>>> does not impact the time-based queries, it is fine). The
> > >>>>>> main
> > >>>> motivation is
> > >>>>>> that on IQ, where both active and standby tasks could be
> > >>>>>> accessed,
> > >>> we
> > >>>> can
> > >>>>>> eventually improve our consistency guarantee to have 1)
> > >>>> read-your-write, 2)
> > >>>>>> consistency across stores, etc.
> > >>>>>>
> > >>>>>> I agree with John's assessment in the previous email, and
> > >>>>>> just to
> > >>>> clarify
> > >>>>>> more concretely what I'm thinking.
> > >>>>>>
> > >>>>>>
> > >>>>>> Guozhang
> > >>>>>>
> > >>>>>>
> > >>>>>> On Tue, Mar 3, 2020 at 9:03 AM John Roesler
> > >>>>>> <vv...@apache.org>
> > >>>> wrote:
> > >>>>>>
> > >>>>>>> Thanks, Guozhang and Bruno!
> > >>>>>>>
> > >>>>>>> 2) I had a similar though to both of you about the
> > >>>>>>> metrics, but I
> > >>>> ultimately
> > >>>>>>> came out with a conclusion like Bruno's. These aren't
> > >>>>>>> dropped
> > >>> invalid
> > >>>>>>> records, they're intentionally dropped, valid, but
> > >>>>>>> unnecessary,
> > >>>> updates.
> > >>>>>>> A "warning" for this case definitely seems wrong, and
> > >>>>>>> I'd also not recommend counting these events along with
> > >>>>>>> "dropped-records", because those
> > >>> are
> > >>>>>>> all dropped invalid records, e.g., late or null-keyed
> > >>>>>>> or couldn't
> > >>> be
> > >>>>>>> deserialized.
> > >>>>>>>
> > >>>>>>> Like Bruno pointed out, an operator should be concerned
> > >>>>>>> to see non-zero "dropped-records", and would then
> > >>>>>>> consult the logs for
> > >>>> warnings.
> > >>>>>>> But that same person should be happy to see
> > >>>> "dropped-idempotent-updates"
> > >>>>>>> increasing, since it means they're saving time and
> > >>>>>>> money. Maybe
> > >>> the
> > >>>> name
> > >>>>>>> of the metric could be different, but I couldn't think
> > >>>>>>> of a better
> > >>>> one.
> > >>>>>>> OTOH, maybe it just stands out to us because we
> > >>>>>>> recently discussed those
> > >>>> other
> > >>>>>>> metrics in KIP-444?
> > >>>>>>>
> > >>>>>>> 1) Maybe we should discuss this point more. It seems
> > >>>>>>> like we should
> > >>>> maintain
> > >>>>>>> an invariant that the following three objects always
> > >>>>>>> have exactly
> > >>> the
> > >>>>>> same
> > >>>>>>> state (modulo flush boundaries): 1. The internal state
> > >>>>>>> store 2. The changelog 3. The operation's result view
> > >>>>>>>
> > >>>>>>> That is, if I have a materialized Filter, then it seems
> > >>>>>>> like I
> > >>> _must_
> > >>>>>> store
> > >>>>>>> exactly the same record in the store and the changelog,
> > >>>>>>> and also
> > >>>> forward
> > >>>>>>> the exact same record, including the timestamp, to the
> > >>>>>>> downstream operations.
> > >>>>>>>
> > >>>>>>> If we store something different in the internal state
> > >>>>>>> store than
> > >>> the
> > >>>>>>> changelog, we can get a situation where the state is
> > >>>>>>> actually
> > >>>> different
> > >>>>>>> after restoration than it is during processing, and
> > >>>>>>> queries against
> > >>>> standbys
> > >>>>>>> would return different results than queries against the
> > >>>>>>> active tasks.
> > >>>>>>>
> > >>>>>>> Regarding storing something different in the
> > >>>>>>> store+changelog than
> > >>> we
> > >>>>>>> forward downstream, consider the following topology:
> > >>>>>>> sourceTable .filter(someFilter, Materialized.as("f1"))
> > >>>>>>> .filter(_ -> true, Materialized.as("f2"))
> > >>>>>>>
> > >>>>>>> If we didn't forward exactly the same data we store,
> > >>>>>>> then
> > >>> querying f2
> > >>>>>>> would return different results than querying f1, which
> > >>>>>>> is clearly
> > >>> not
> > >>>>>>> correct, given the topology.
> > >>>>>>>
> > >>>>>>> It seems like maybe what you have in mind is the
> > >>>>>>> preservation of
> > >>>> stream
> > >>>>>>> time across restart/rebalance? This bug is still open,
> > >>>>>>> actually:
> > >>>>>>> https://issues.apache.org/jira/browse/KAFKA-9368 It
> > >>>>>>> seems like solving that bug would be independent of
> > >>>>>>> KIP-557.
> > >>> I.e.,
> > >>>>>>> KIP-557 neither makes that bug worse or better.
> > >>>>>>>
> > >>>>>>> One other thought I had is maybe you were thinking that
> > >>>>>>> operators would update their internally tracked stream
> > >>>>>>> time, but still
> > >>> discard
> > >>>>>>> records? I think that _would_ be a bug. That is, if a
> > >>>>>>> record gets
> > >>>>>> discarded
> > >>>>>>> as idempotent, it should have no effect at all on the
> > >>>>>>> state of the application. Reflecting on my prior
> > >>>>>>> analysis of stream time, most of the cases
> > >>>> where
> > >>>>>> we
> > >>>>>>> track stream time is in Stream aggregations, and in
> > >>>>>>> those cases,
> > >>> if
> > >>>> an
> > >>>>>>> incoming record's timestamp is higher than the previous
> > >>>>>>> stream
> > >>> time,
> > >>>> it
> > >>>>>>> would already not be considered idempotent. So we would
> > >>>>>>> store,
> > >>> log,
> > >>>> and
> > >>>>>>> forward the result with the new timestamp. The only
> > >>>>>>> other case is Suppress. With respect to idempotence,
> > >>>> Suppress is
> > >>>>>>> equivalent to a stateless no-op transformation. All it
> > >>>>>>> does is
> > >>>> collect
> > >>>>>> and
> > >>>>>>> delay updates. It has no memory of what it previously
> > >>>>>>> emitted, so it
> > >>>> wouldn't
> > >>>>>>> be possible for it to check for idempotence anyway.
> > >>>>>>>
> > >>>>>>> Was that what you were thinking? Thanks, -John
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
> > >>>>>>>> Hi Guozhang,
> > >>>>>>>>
> > >>>>>>>> I also had the same thought about using the existing
> > >>>> "dropped-records"
> > >>>>>>>> metrics. However, I think in this case it would be
> > >>>>>>>> better to
> > >>> use a
> > >>>> new
> > >>>>>>>> metric because dropped idempotent updates is an
> > >>>>>>>> optimization,
> > >>> they
> > >>>> do
> > >>>>>>>> not represent missed records. The dropped idempotent
> > >>>>>>>> updates in general do not change the result and so do
> > >>>>>>>> not need a warn log message. Whereas dropped records
> > >>>>>>>> due to expired windows,
> > >>>> serialization
> > >>>>>>>> errors, or lateness might be something concerning
> > >>>>>>>> that need a
> > >>> warn
> > >>>> log
> > >>>>>>>> message.
> > >>>>>>>>
> > >>>>>>>> Looking at the metrics, you would be happy to see
> > >>>>>>>> "dropped-idempotent-updates" increase, because that
> > >>>>>>>> means
> > >>> Streams
> > >>>> gets
> > >>>>>>>> rid of no-ops downstream, but you would be concerned
> > >>>>>>>> if "dropped-records" would increase, because that
> > >>>>>>>> means your
> > >>> records
> > >>>> or
> > >>>>>>>> the configuration of your app has issues. The
> > >>>>>>>> "dropped-idempotent-updates" metric could also be an
> > >>>>>>>> indication
> > >>>> that
> > >>>>>>>> you could further optimize your setup, by getting rid
> > >>>>>>>> of
> > >>> idempotent
> > >>>>>>>> updates further upstream.
> > >>>>>>>>
> > >>>>>>>> Best, Bruno
> > >>>>>>>>
> > >>>>>>>> On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <
> > >>> wangguoz@gmail.com>
> > >>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Hello Richard,
> > >>>>>>>>>
> > >>>>>>>>> Thanks for the KIP. I once reviewed it and was
> > >>>>>>>>> concerned about
> > >>>> its
> > >>>>>>> effects
> > >>>>>>>>> on stream time advancing. After reading the updated
> > >>>>>>>>> KIP I
> > >>> think
> > >>>> it
> > >>>>>> has
> > >>>>>>>>> answered a lot of them already.
> > >>>>>>>>>
> > >>>>>>>>> I have a couple minor comments still, otherwise I'm
> > >>>>>>>>> +1:
> > >>>>>>>>>
> > >>>>>>>>> 1) I want to clarify that for operations resulted
> > >>>>>>>>> in KTables
> > >>> (not
> > >>>>>> only
> > >>>>>>>>> aggregations, but consider KTable#filter that may
> > >>>>>>>>> also result
> > >>> in
> > >>>> a
> > >>>>>> new
> > >>>>>>>>> KTable), even if we drop emissions to the
> > >>>>>>>>> downstream topics we
> > >>>> would
> > >>>>>>> still
> > >>>>>>>>> append to the corresponding changelog if timestamp
> > >>>>>>>>> has
> > >>> changed.
> > >>>> This
> > >>>>>> is
> > >>>>>>>>> because the timestamps on the changelog is read by
> > >>>>>>>>> the standby
> > >>>> tasks
> > >>>>>>> which
> > >>>>>>>>> relies on them to infer its own stream time
> > >>>>>>>>> advancing.
> > >>>>>>>>>
> > >>>>>>>>> 2) About the metrics, in KIP-444 we are
> > >>>>>>>>> consolidating all
> > >>> types
> > >>>> of
> > >>>>>>>>> scenarios that can cause dropped records to the
> > >>>>>>>>> same metrics:
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi
> > t+on+change+support+for+Kafka+Streams
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>
> > late-records-drop: INFO at processor node level, replaced by
> > >>> INFO
> > >>>>>>>>> task-level "dropped-records".
> > >>>>>>>>>
> > >>>>>>>>> skipped-records: INFO at thread and processor node
> > >>>>>>>>> level,
> > >>>> replaced by
> > >>>>>>> INFO
> > >>>>>>>>> task-level "dropped-records".
> > >>>>>>>>>
> > >>>>>>>>> expired-window-record-drop: DEBUG at state store
> > >>>>>>>>> level,
> > >>> replaced
> > >>>> by
> > >>>>>>> INFO
> > >>>>>>>>> task-level "dropped-records".
> > >>>>>>>>>
> > >>>>>>>>> The main idea is that instead of using different
> > >>>>>>>>> metrics to
> > >>>> indicate
> > >>>>>>>>> different types of scenarios, and users just alert
> > >>>>>>>>> on that
> > >>> single
> > >>>>>>> metrics.
> > >>>>>>>>> When alert triggers, they can look into the log4j
> > >>>>>>>>> for its
> > >>> causes
> > >>>> (we
> > >>>>>>> made
> > >>>>>>>>> sure that all sensor recordings of this metric
> > >>>>>>>>> would be
> > >>>> associated
> > >>>>>>> with a
> > >>>>>>>>> warning log4j).
> > >>>>>>>>>
> > >>>>>>>>> So I'd suggest that instead of introducing a new
> > >>>>>>>>> per-node "dropped-idempotent-updates", we just
> > >>>>>>>>> piggy-back on the
> > >>> existing
> > >>>>>>> task-level
> > >>>>>>>>> metric; unless we think that idempotent drops are
> > >>>>>>>>> more
> > >>> frequent
> > >>>> than
> > >>>>>>> others
> > >>>>>>>>> and also they do not worth a warning log, in that
> > >>>>>>>>> case we can
> > >>>>>> consider
> > >>>>>>>>> break this metric down with different tags for
> > >>>>>>>>> example.
> > >>>>>>>>>
> > >>>>>>>>> Guozhang
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <
> > >>>>>> yohan.richard.yu@gmail.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi all,
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks for the votes so far! @Matthias or
> > >>>>>>>>>> @Guozhang Wang <gu...@confluent.io> it
> > >>> would
> > >>>> be
> > >>>>>>> great to
> > >>>>>>>>>> also get your input on this KIP.
> > >>>>>>>>>>
> > >>>>>>>>>> It looks to be pretty close to completion, so the
> > >>>>>>>>>> finishing
> > >>>> touches
> > >>>>>>> are all
> > >>>>>>>>>> we need. :)
> > >>>>>>>>>>
> > >>>>>>>>>> Best, Richard
> > >>>>>>>>>>
> > >>>>>>>>>> On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine
> > >>>>>>>>>> < Ghassan.Yammine@bazaarvoice.com> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hello all,
> > >>>>>>>>>>>
> > >>>>>>>>>>> +1 (non-binding)
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Ghassan
> > >>>>>>>>>>>
> > >>>>>>>>>>> On 3/2/20, 12:43 PM, "Bruno Cadonna"
> > >>>>>>>>>>> <bruno@confluent.io
> > >>>>
> > >>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> EXTERNAL: This email originated from outside
> > >>>>>>>>>>> of
> > >>>> Bazaarvoice.
> > >>>>>>> Do not
> > >>>>>>>>>>> click any links or open any attachments unless
> > >>>>>>>>>>> you trust
> > >>> the
> > >>>>>>> sender and
> > >>>>>>>>>>> know the content is safe.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hi Richard,
> > >>>>>>>>>>>
> > >>>>>>>>>>> +1 (non-binding)
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best, Bruno
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
> > >>>>>>> vvcephei@apache.org>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Hi Richard,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks for the KIP!
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I'm +1 (binding)
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> -john
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Thu, Feb 27, 2020, at 14:40, Richard Yu
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I am proposing a new optimization to Kafka
> > >>>>>>>>>>>>> Streams
> > >>>> which
> > >>>>>>> would
> > >>>>>>>>>>> greatly
> > >>>>>>>>>>>>> reduce the number of idempotent updates
> > >>>>>>>>>>>>> (or
> > >>> no-ops)
> > >>>> in
> > >>>>>> the
> > >>>>>>> Kafka
> > >>>>>>>>>>> Streams
> > >>>>>>>>>>>>> DAG. A number of users have been interested
> > >>>>>>>>>>>>> in this
> > >>>> feature,
> > >>>>>> so
> > >>>>>>> it
> > >>>>>>>>>>> would be nice
> > >>>>>>>>>>>>> to pass this one in.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> For information, the KIP is described
> > >>>>>>>>>>>>> below:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi
> > t+on+change+support+for+Kafka+Streams
> > >>>>>>>>>>>
> > >>>
> > >>
> > >>>>>>>>>>>>> We aim to make Kafka Streams more efficient
> > >>>>>>>>>>>>> by
> > >>>> adopting
> > >>>>>>> the "emit
> > >>>>>>>>>>> on
> > >>>>>>>>>>>>> change" reporting strategy.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Please cast your vote!
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Best, Richard
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> -- -- Guozhang
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> -- -- Guozhang
> > >>>>>>
> > >>>>
> > >>>
> > >>>
> > >>> -- -- Guozhang
> > >>>
> > >>
> > >
> > -----BEGIN PGP SIGNATURE-----
> >
> > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kC1kACgkQO4miYXKq
> > /OhHKA/+OewqjX248vjk6GO6Ex/f2kOJuIIDGb4/c0NlTIS/Iyat1+S8N9P58KNP
> > pg133xwdWHagU7wajYMktoFiPamQ+Cv+PPhr7qz38JdfVAvzpNb8tcsI/wr5apOQ
> > XNlBsPhQBLtO/JQUve72OqY/TC9unbpBfhA4tvdA/qkLNvDaX542SrZdlwXuqTKH
> > EBpgEPBrwaqJ5S65KTMs6Fppc5c2V3dWOAC7Ssql30OneUd/RS88oQ07oNkwZwss
> > tADw+tzXtw8a0C0PGtMoXhLrs9wipEsuGOP8N6uvuQCM7YoIvTyeBf3Cu7jG8NFB
> > r2caoWY4TZkqCRsrKe37nNbR8KpjkNQBxCZ7nvIJ9B3KCdB0JOFQXwYj1+23z6aX
> > T1otQ+0ZIg5lzpIFiHCzwzO5mo2VUEYryRvanw/f2S/LaaBIcg83Dz5TJIv8dFcd
> > mU7Vu1KXtpWTgpg48JkWd9qSwPqBaR+nvbdP/DnStwf9/9n5SSGgcdS83jw/w6RV
> > N1bX6YlDCFYeIIT14lrsbWiHSZpiFARZ0fn+VBm8DAF0g+mWlX5Hg30yHKujDj+h
> > qMDZkI2K2eoYRJaUFcS3yvr2RqCtgXMCEr+jrAVGHDaq+Lt4mbEJRZdon3MiF0Ht
> > WmEiNaQa7Tu5h+8P5Rb05kPAB6ODa7/sC0BxC54uRXLdPnNxQCs=
> > =nuG1
> > -----END PGP SIGNATURE-----
> >
>

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Richard Yu <yo...@gmail.com>.
Hi Matthias,

Oh, I see. Next time, I will take that into account.
It looked like at the time there wasn't much contention over the major
points of the proposal, so I thought I could pass it.

I will also make some last modifications to the KIP.

Thanks for your vote!

Best,
Richard


On Sat, Mar 7, 2020 at 1:00 PM Matthias J. Sax <mj...@apache.org> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Richard,
>
> you cannot close a KIP as accepted with 2 binging votes. (cf
> https://cwiki.apache.org/confluence/display/KAFKA/Bylaws)
>
> You could only discard the KIP as long as it's not accepted :D
>
> However, I am +1 (binding) and thus you can close the VOTE as accepted.
>
>
> Just a three minor follow up comments:
>
> (1) In "Reporting Strategies" you mention in point (2) "Emit on update
> / non-empty content" -- I am not sure what "empty content" would be.
> This is a little bit confusing. Maybe just remove it?
>
>
> (2) "Design Reasoning"
>
> > we have decided that we will forward aggregation results if and
> > only if the timestamp and the value had not changed
>
> This sounds incorrect. If both value and timestamp have not changed,
> we would skip the update from my understanding?
>
> Ie, to phrase is differently: for a table-operation we only consider
> the value to make a comparison and if the value does not change, we
> don't emit anything (even if the timestamp changed).
>
> For windowed aggregations however, even if the value does not change,
> but the timestamp advances, we emit, ie, a changing timestamp is not
> considered idempotent for this case. (Note, that the timestamp can
> never go backward for this case, because it's computed as maximum over
> all input record for the window).
>
>
> (3) The discussion about stream time is very interesting. I agree that
> it's an orthogonal concern to this KIP.
>
>
>
> - -Matthias
>
>
> On 3/6/20 1:52 PM, Richard Yu wrote:
> > Hi all,
> >
> > I have decided to pass this KIP with 2 binding votes and 3
> > non-binding votes (including mine). I will update KIP status
> > shortly after this.
> >
> > Best, Richard
> >
> > On Thu, Mar 5, 2020 at 3:45 PM Richard Yu
> > <yo...@gmail.com> wrote:
> >
> >> Hi all,
> >>
> >> Just polling for some last changes on the name. I think that
> >> since there doesn't seem to be much objection to any major
> >> changes in the KIP, I will pass it this Friday.
> >>
> >> If you feel that we still need some more discussion, please let
> >> me know. :)
> >>
> >> Best, Richard
> >>
> >> P.S. Will start working on a PR for this one soon.
> >>
> >> On Wed, Mar 4, 2020 at 1:30 PM Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >>
> >>> Regarding the metric name, I was actually trying to be
> >>> consistent with the node-level `suppression-emit` as I feel
> >>> this one's characteristics is closer to that. I other folks
> >>> feels better to align with the task-level "dropped-records" I
> >>> think I can be convinced too.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Wed, Mar 4, 2020 at 12:09 AM Bruno Cadonna
> >>> <br...@confluent.io> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> may I make a non-binding proposal for the metric name? I
> >>>> would prefer "skipped-idempotent-updates" to be consistent
> >>>> with the "dropped-records".
> >>>>
> >>>> Best, Bruno
> >>>>
> >>>> On Tue, Mar 3, 2020 at 11:57 PM Richard Yu
> >>>> <yo...@gmail.com> wrote:
> >>>>>
> >>>>> Hi all,
> >>>>>
> >>>>> Thanks for the discussion!
> >>>>>
> >>>>> @Guozhang, I will make the corresponding changes to the KIP
> >>>>> (i.e.
> >>>> renaming
> >>>>> the sensor and adding some notes). With the current state
> >>>>> of things, we are very close. Just need that
> >>> one
> >>>>> last binding vote.
> >>>>>
> >>>>> @Matthias J. Sax <ma...@confluent.io>  It would be ideal
> >>>>> if we can
> >>>> also
> >>>>> get your last two cents on this as well. Other than that,
> >>>>> we are good.
> >>>>>
> >>>>> Best, Richard
> >>>>>
> >>>>>
> >>>>> On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang
> >>>>> <wa...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Bruno, John:
> >>>>>>
> >>>>>> 1) That makes sense. If we consider them to be
> >>>>>> node-specific metrics
> >>>> that
> >>>>>> only applies to a subset of built-in processor nodes that
> >>>>>> are
> >>>> irrelevant to
> >>>>>> alert-relevant metrics (just like suppression-emit (rate
> >>>>>> | total)),
> >>>> they'd
> >>>>>> better be per-node instead of per-task and we would not
> >>>>>> associate
> >>> such
> >>>>>> events with warning. With that in mind, I'd suggest we
> >>>>>> consider
> >>>> renaming
> >>>>>> the metric without the `dropped` keyword to distinguish
> >>>>>> it with the per-task level sensor. How about
> >>>>>> "idempotent-update-skip (rate |
> >>>> total)"?
> >>>>>>
> >>>>>> Also a minor suggestion: we should clarify in the KIP /
> >>>>>> javadocs
> >>> which
> >>>>>> built-in processor nodes would have this metric while
> >>>>>> others don't.
> >>>>>>
> >>>>>> 2) About stream time tracking, there are multiple known
> >>>>>> issues that
> >>> we
> >>>>>> should close to improve our consistency semantics:
> >>>>>>
> >>>>>> a. preserve stream time of active tasks across rebalances
> >>>>>> where
> >>> they
> >>>> may
> >>>>>> be migrated. This is what KAFKA-9368
> >>>>>> <https://issues.apache.org/jira/browse/KAFKA-9368> meant
> >>>>>> for. b. preserve stream time of standby tasks to be
> >>>>>> aligned with the
> >>> active
> >>>>>> tasks, via the changelog topics.
> >>>>>>
> >>>>>> And what I'm more concerning is b) here. For example:
> >>>>>> let's say we
> >>>> have a
> >>>>>> topology of `source -> A -> repartition -> B` where both
> >>>>>> A and B
> >>> have
> >>>>>> states along with changelogs, and both of them have
> >>>>>> standbys. If a
> >>>> record
> >>>>>> is piped from the source and completed traversed through
> >>>>>> the
> >>> topology,
> >>>> we
> >>>>>> need to make sure that the stream time inferred across:
> >>>>>>
> >>>>>> * active task A (inferred from the source record), *
> >>>>>> active task B (inferred from the derived record from
> >>>>>> repartition
> >>>> topic),
> >>>>>> * standby task A (inferred from the changelog topic of
> >>>>>> A's store), * standby task B (inferred from the changelog
> >>>>>> topic of B's store)
> >>>>>>
> >>>>>> are consistent (note I'm not saying they should be
> >>>>>> "exactly the
> >>> same",
> >>>> but
> >>>>>> consistent, meaning that they may have different values
> >>>>>> but as long
> >>> as
> >>>> that
> >>>>>> does not impact the time-based queries, it is fine). The
> >>>>>> main
> >>>> motivation is
> >>>>>> that on IQ, where both active and standby tasks could be
> >>>>>> accessed,
> >>> we
> >>>> can
> >>>>>> eventually improve our consistency guarantee to have 1)
> >>>> read-your-write, 2)
> >>>>>> consistency across stores, etc.
> >>>>>>
> >>>>>> I agree with John's assessment in the previous email, and
> >>>>>> just to
> >>>> clarify
> >>>>>> more concretely what I'm thinking.
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Mar 3, 2020 at 9:03 AM John Roesler
> >>>>>> <vv...@apache.org>
> >>>> wrote:
> >>>>>>
> >>>>>>> Thanks, Guozhang and Bruno!
> >>>>>>>
> >>>>>>> 2) I had a similar though to both of you about the
> >>>>>>> metrics, but I
> >>>> ultimately
> >>>>>>> came out with a conclusion like Bruno's. These aren't
> >>>>>>> dropped
> >>> invalid
> >>>>>>> records, they're intentionally dropped, valid, but
> >>>>>>> unnecessary,
> >>>> updates.
> >>>>>>> A "warning" for this case definitely seems wrong, and
> >>>>>>> I'd also not recommend counting these events along with
> >>>>>>> "dropped-records", because those
> >>> are
> >>>>>>> all dropped invalid records, e.g., late or null-keyed
> >>>>>>> or couldn't
> >>> be
> >>>>>>> deserialized.
> >>>>>>>
> >>>>>>> Like Bruno pointed out, an operator should be concerned
> >>>>>>> to see non-zero "dropped-records", and would then
> >>>>>>> consult the logs for
> >>>> warnings.
> >>>>>>> But that same person should be happy to see
> >>>> "dropped-idempotent-updates"
> >>>>>>> increasing, since it means they're saving time and
> >>>>>>> money. Maybe
> >>> the
> >>>> name
> >>>>>>> of the metric could be different, but I couldn't think
> >>>>>>> of a better
> >>>> one.
> >>>>>>> OTOH, maybe it just stands out to us because we
> >>>>>>> recently discussed those
> >>>> other
> >>>>>>> metrics in KIP-444?
> >>>>>>>
> >>>>>>> 1) Maybe we should discuss this point more. It seems
> >>>>>>> like we should
> >>>> maintain
> >>>>>>> an invariant that the following three objects always
> >>>>>>> have exactly
> >>> the
> >>>>>> same
> >>>>>>> state (modulo flush boundaries): 1. The internal state
> >>>>>>> store 2. The changelog 3. The operation's result view
> >>>>>>>
> >>>>>>> That is, if I have a materialized Filter, then it seems
> >>>>>>> like I
> >>> _must_
> >>>>>> store
> >>>>>>> exactly the same record in the store and the changelog,
> >>>>>>> and also
> >>>> forward
> >>>>>>> the exact same record, including the timestamp, to the
> >>>>>>> downstream operations.
> >>>>>>>
> >>>>>>> If we store something different in the internal state
> >>>>>>> store than
> >>> the
> >>>>>>> changelog, we can get a situation where the state is
> >>>>>>> actually
> >>>> different
> >>>>>>> after restoration than it is during processing, and
> >>>>>>> queries against
> >>>> standbys
> >>>>>>> would return different results than queries against the
> >>>>>>> active tasks.
> >>>>>>>
> >>>>>>> Regarding storing something different in the
> >>>>>>> store+changelog than
> >>> we
> >>>>>>> forward downstream, consider the following topology:
> >>>>>>> sourceTable .filter(someFilter, Materialized.as("f1"))
> >>>>>>> .filter(_ -> true, Materialized.as("f2"))
> >>>>>>>
> >>>>>>> If we didn't forward exactly the same data we store,
> >>>>>>> then
> >>> querying f2
> >>>>>>> would return different results than querying f1, which
> >>>>>>> is clearly
> >>> not
> >>>>>>> correct, given the topology.
> >>>>>>>
> >>>>>>> It seems like maybe what you have in mind is the
> >>>>>>> preservation of
> >>>> stream
> >>>>>>> time across restart/rebalance? This bug is still open,
> >>>>>>> actually:
> >>>>>>> https://issues.apache.org/jira/browse/KAFKA-9368 It
> >>>>>>> seems like solving that bug would be independent of
> >>>>>>> KIP-557.
> >>> I.e.,
> >>>>>>> KIP-557 neither makes that bug worse or better.
> >>>>>>>
> >>>>>>> One other thought I had is maybe you were thinking that
> >>>>>>> operators would update their internally tracked stream
> >>>>>>> time, but still
> >>> discard
> >>>>>>> records? I think that _would_ be a bug. That is, if a
> >>>>>>> record gets
> >>>>>> discarded
> >>>>>>> as idempotent, it should have no effect at all on the
> >>>>>>> state of the application. Reflecting on my prior
> >>>>>>> analysis of stream time, most of the cases
> >>>> where
> >>>>>> we
> >>>>>>> track stream time is in Stream aggregations, and in
> >>>>>>> those cases,
> >>> if
> >>>> an
> >>>>>>> incoming record's timestamp is higher than the previous
> >>>>>>> stream
> >>> time,
> >>>> it
> >>>>>>> would already not be considered idempotent. So we would
> >>>>>>> store,
> >>> log,
> >>>> and
> >>>>>>> forward the result with the new timestamp. The only
> >>>>>>> other case is Suppress. With respect to idempotence,
> >>>> Suppress is
> >>>>>>> equivalent to a stateless no-op transformation. All it
> >>>>>>> does is
> >>>> collect
> >>>>>> and
> >>>>>>> delay updates. It has no memory of what it previously
> >>>>>>> emitted, so it
> >>>> wouldn't
> >>>>>>> be possible for it to check for idempotence anyway.
> >>>>>>>
> >>>>>>> Was that what you were thinking? Thanks, -John
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
> >>>>>>>> Hi Guozhang,
> >>>>>>>>
> >>>>>>>> I also had the same thought about using the existing
> >>>> "dropped-records"
> >>>>>>>> metrics. However, I think in this case it would be
> >>>>>>>> better to
> >>> use a
> >>>> new
> >>>>>>>> metric because dropped idempotent updates is an
> >>>>>>>> optimization,
> >>> they
> >>>> do
> >>>>>>>> not represent missed records. The dropped idempotent
> >>>>>>>> updates in general do not change the result and so do
> >>>>>>>> not need a warn log message. Whereas dropped records
> >>>>>>>> due to expired windows,
> >>>> serialization
> >>>>>>>> errors, or lateness might be something concerning
> >>>>>>>> that need a
> >>> warn
> >>>> log
> >>>>>>>> message.
> >>>>>>>>
> >>>>>>>> Looking at the metrics, you would be happy to see
> >>>>>>>> "dropped-idempotent-updates" increase, because that
> >>>>>>>> means
> >>> Streams
> >>>> gets
> >>>>>>>> rid of no-ops downstream, but you would be concerned
> >>>>>>>> if "dropped-records" would increase, because that
> >>>>>>>> means your
> >>> records
> >>>> or
> >>>>>>>> the configuration of your app has issues. The
> >>>>>>>> "dropped-idempotent-updates" metric could also be an
> >>>>>>>> indication
> >>>> that
> >>>>>>>> you could further optimize your setup, by getting rid
> >>>>>>>> of
> >>> idempotent
> >>>>>>>> updates further upstream.
> >>>>>>>>
> >>>>>>>> Best, Bruno
> >>>>>>>>
> >>>>>>>> On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <
> >>> wangguoz@gmail.com>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hello Richard,
> >>>>>>>>>
> >>>>>>>>> Thanks for the KIP. I once reviewed it and was
> >>>>>>>>> concerned about
> >>>> its
> >>>>>>> effects
> >>>>>>>>> on stream time advancing. After reading the updated
> >>>>>>>>> KIP I
> >>> think
> >>>> it
> >>>>>> has
> >>>>>>>>> answered a lot of them already.
> >>>>>>>>>
> >>>>>>>>> I have a couple minor comments still, otherwise I'm
> >>>>>>>>> +1:
> >>>>>>>>>
> >>>>>>>>> 1) I want to clarify that for operations resulted
> >>>>>>>>> in KTables
> >>> (not
> >>>>>> only
> >>>>>>>>> aggregations, but consider KTable#filter that may
> >>>>>>>>> also result
> >>> in
> >>>> a
> >>>>>> new
> >>>>>>>>> KTable), even if we drop emissions to the
> >>>>>>>>> downstream topics we
> >>>> would
> >>>>>>> still
> >>>>>>>>> append to the corresponding changelog if timestamp
> >>>>>>>>> has
> >>> changed.
> >>>> This
> >>>>>> is
> >>>>>>>>> because the timestamps on the changelog is read by
> >>>>>>>>> the standby
> >>>> tasks
> >>>>>>> which
> >>>>>>>>> relies on them to infer its own stream time
> >>>>>>>>> advancing.
> >>>>>>>>>
> >>>>>>>>> 2) About the metrics, in KIP-444 we are
> >>>>>>>>> consolidating all
> >>> types
> >>>> of
> >>>>>>>>> scenarios that can cause dropped records to the
> >>>>>>>>> same metrics:
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi
> t+on+change+support+for+Kafka+Streams
> >>>>>>>>>
> >>>>>>>>>
> >>>
> late-records-drop: INFO at processor node level, replaced by
> >>> INFO
> >>>>>>>>> task-level "dropped-records".
> >>>>>>>>>
> >>>>>>>>> skipped-records: INFO at thread and processor node
> >>>>>>>>> level,
> >>>> replaced by
> >>>>>>> INFO
> >>>>>>>>> task-level "dropped-records".
> >>>>>>>>>
> >>>>>>>>> expired-window-record-drop: DEBUG at state store
> >>>>>>>>> level,
> >>> replaced
> >>>> by
> >>>>>>> INFO
> >>>>>>>>> task-level "dropped-records".
> >>>>>>>>>
> >>>>>>>>> The main idea is that instead of using different
> >>>>>>>>> metrics to
> >>>> indicate
> >>>>>>>>> different types of scenarios, and users just alert
> >>>>>>>>> on that
> >>> single
> >>>>>>> metrics.
> >>>>>>>>> When alert triggers, they can look into the log4j
> >>>>>>>>> for its
> >>> causes
> >>>> (we
> >>>>>>> made
> >>>>>>>>> sure that all sensor recordings of this metric
> >>>>>>>>> would be
> >>>> associated
> >>>>>>> with a
> >>>>>>>>> warning log4j).
> >>>>>>>>>
> >>>>>>>>> So I'd suggest that instead of introducing a new
> >>>>>>>>> per-node "dropped-idempotent-updates", we just
> >>>>>>>>> piggy-back on the
> >>> existing
> >>>>>>> task-level
> >>>>>>>>> metric; unless we think that idempotent drops are
> >>>>>>>>> more
> >>> frequent
> >>>> than
> >>>>>>> others
> >>>>>>>>> and also they do not worth a warning log, in that
> >>>>>>>>> case we can
> >>>>>> consider
> >>>>>>>>> break this metric down with different tags for
> >>>>>>>>> example.
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>> On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <
> >>>>>> yohan.richard.yu@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the votes so far! @Matthias or
> >>>>>>>>>> @Guozhang Wang <gu...@confluent.io> it
> >>> would
> >>>> be
> >>>>>>> great to
> >>>>>>>>>> also get your input on this KIP.
> >>>>>>>>>>
> >>>>>>>>>> It looks to be pretty close to completion, so the
> >>>>>>>>>> finishing
> >>>> touches
> >>>>>>> are all
> >>>>>>>>>> we need. :)
> >>>>>>>>>>
> >>>>>>>>>> Best, Richard
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine
> >>>>>>>>>> < Ghassan.Yammine@bazaarvoice.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hello all,
> >>>>>>>>>>>
> >>>>>>>>>>> +1 (non-binding)
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> Ghassan
> >>>>>>>>>>>
> >>>>>>>>>>> On 3/2/20, 12:43 PM, "Bruno Cadonna"
> >>>>>>>>>>> <bruno@confluent.io
> >>>>
> >>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> EXTERNAL: This email originated from outside
> >>>>>>>>>>> of
> >>>> Bazaarvoice.
> >>>>>>> Do not
> >>>>>>>>>>> click any links or open any attachments unless
> >>>>>>>>>>> you trust
> >>> the
> >>>>>>> sender and
> >>>>>>>>>>> know the content is safe.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Hi Richard,
> >>>>>>>>>>>
> >>>>>>>>>>> +1 (non-binding)
> >>>>>>>>>>>
> >>>>>>>>>>> Best, Bruno
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
> >>>>>>> vvcephei@apache.org>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi Richard,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the KIP!
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm +1 (binding)
> >>>>>>>>>>>>
> >>>>>>>>>>>> -john
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, Feb 27, 2020, at 14:40, Richard Yu
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I am proposing a new optimization to Kafka
> >>>>>>>>>>>>> Streams
> >>>> which
> >>>>>>> would
> >>>>>>>>>>> greatly
> >>>>>>>>>>>>> reduce the number of idempotent updates
> >>>>>>>>>>>>> (or
> >>> no-ops)
> >>>> in
> >>>>>> the
> >>>>>>> Kafka
> >>>>>>>>>>> Streams
> >>>>>>>>>>>>> DAG. A number of users have been interested
> >>>>>>>>>>>>> in this
> >>>> feature,
> >>>>>> so
> >>>>>>> it
> >>>>>>>>>>> would be nice
> >>>>>>>>>>>>> to pass this one in.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For information, the KIP is described
> >>>>>>>>>>>>> below:
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi
> t+on+change+support+for+Kafka+Streams
> >>>>>>>>>>>
> >>>
> >>
> >>>>>>>>>>>>> We aim to make Kafka Streams more efficient
> >>>>>>>>>>>>> by
> >>>> adopting
> >>>>>>> the "emit
> >>>>>>>>>>> on
> >>>>>>>>>>>>> change" reporting strategy.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Please cast your vote!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best, Richard
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -- -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> -- -- Guozhang
> >>>>>>
> >>>>
> >>>
> >>>
> >>> -- -- Guozhang
> >>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
>
> iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kC1kACgkQO4miYXKq
> /OhHKA/+OewqjX248vjk6GO6Ex/f2kOJuIIDGb4/c0NlTIS/Iyat1+S8N9P58KNP
> pg133xwdWHagU7wajYMktoFiPamQ+Cv+PPhr7qz38JdfVAvzpNb8tcsI/wr5apOQ
> XNlBsPhQBLtO/JQUve72OqY/TC9unbpBfhA4tvdA/qkLNvDaX542SrZdlwXuqTKH
> EBpgEPBrwaqJ5S65KTMs6Fppc5c2V3dWOAC7Ssql30OneUd/RS88oQ07oNkwZwss
> tADw+tzXtw8a0C0PGtMoXhLrs9wipEsuGOP8N6uvuQCM7YoIvTyeBf3Cu7jG8NFB
> r2caoWY4TZkqCRsrKe37nNbR8KpjkNQBxCZ7nvIJ9B3KCdB0JOFQXwYj1+23z6aX
> T1otQ+0ZIg5lzpIFiHCzwzO5mo2VUEYryRvanw/f2S/LaaBIcg83Dz5TJIv8dFcd
> mU7Vu1KXtpWTgpg48JkWd9qSwPqBaR+nvbdP/DnStwf9/9n5SSGgcdS83jw/w6RV
> N1bX6YlDCFYeIIT14lrsbWiHSZpiFARZ0fn+VBm8DAF0g+mWlX5Hg30yHKujDj+h
> qMDZkI2K2eoYRJaUFcS3yvr2RqCtgXMCEr+jrAVGHDaq+Lt4mbEJRZdon3MiF0Ht
> WmEiNaQa7Tu5h+8P5Rb05kPAB6ODa7/sC0BxC54uRXLdPnNxQCs=
> =nuG1
> -----END PGP SIGNATURE-----
>

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by "Matthias J. Sax" <mj...@apache.org>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Richard,

you cannot close a KIP as accepted with 2 binging votes. (cf
https://cwiki.apache.org/confluence/display/KAFKA/Bylaws)

You could only discard the KIP as long as it's not accepted :D

However, I am +1 (binding) and thus you can close the VOTE as accepted.


Just a three minor follow up comments:

(1) In "Reporting Strategies" you mention in point (2) "Emit on update
/ non-empty content" -- I am not sure what "empty content" would be.
This is a little bit confusing. Maybe just remove it?


(2) "Design Reasoning"

> we have decided that we will forward aggregation results if and
> only if the timestamp and the value had not changed

This sounds incorrect. If both value and timestamp have not changed,
we would skip the update from my understanding?

Ie, to phrase is differently: for a table-operation we only consider
the value to make a comparison and if the value does not change, we
don't emit anything (even if the timestamp changed).

For windowed aggregations however, even if the value does not change,
but the timestamp advances, we emit, ie, a changing timestamp is not
considered idempotent for this case. (Note, that the timestamp can
never go backward for this case, because it's computed as maximum over
all input record for the window).


(3) The discussion about stream time is very interesting. I agree that
it's an orthogonal concern to this KIP.



- -Matthias


On 3/6/20 1:52 PM, Richard Yu wrote:
> Hi all,
>
> I have decided to pass this KIP with 2 binding votes and 3
> non-binding votes (including mine). I will update KIP status
> shortly after this.
>
> Best, Richard
>
> On Thu, Mar 5, 2020 at 3:45 PM Richard Yu
> <yo...@gmail.com> wrote:
>
>> Hi all,
>>
>> Just polling for some last changes on the name. I think that
>> since there doesn't seem to be much objection to any major
>> changes in the KIP, I will pass it this Friday.
>>
>> If you feel that we still need some more discussion, please let
>> me know. :)
>>
>> Best, Richard
>>
>> P.S. Will start working on a PR for this one soon.
>>
>> On Wed, Mar 4, 2020 at 1:30 PM Guozhang Wang <wa...@gmail.com>
>> wrote:
>>
>>> Regarding the metric name, I was actually trying to be
>>> consistent with the node-level `suppression-emit` as I feel
>>> this one's characteristics is closer to that. I other folks
>>> feels better to align with the task-level "dropped-records" I
>>> think I can be convinced too.
>>>
>>>
>>> Guozhang
>>>
>>> On Wed, Mar 4, 2020 at 12:09 AM Bruno Cadonna
>>> <br...@confluent.io> wrote:
>>>
>>>> Hi all,
>>>>
>>>> may I make a non-binding proposal for the metric name? I
>>>> would prefer "skipped-idempotent-updates" to be consistent
>>>> with the "dropped-records".
>>>>
>>>> Best, Bruno
>>>>
>>>> On Tue, Mar 3, 2020 at 11:57 PM Richard Yu
>>>> <yo...@gmail.com> wrote:
>>>>>
>>>>> Hi all,
>>>>>
>>>>> Thanks for the discussion!
>>>>>
>>>>> @Guozhang, I will make the corresponding changes to the KIP
>>>>> (i.e.
>>>> renaming
>>>>> the sensor and adding some notes). With the current state
>>>>> of things, we are very close. Just need that
>>> one
>>>>> last binding vote.
>>>>>
>>>>> @Matthias J. Sax <ma...@confluent.io>  It would be ideal
>>>>> if we can
>>>> also
>>>>> get your last two cents on this as well. Other than that,
>>>>> we are good.
>>>>>
>>>>> Best, Richard
>>>>>
>>>>>
>>>>> On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang
>>>>> <wa...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Hi Bruno, John:
>>>>>>
>>>>>> 1) That makes sense. If we consider them to be
>>>>>> node-specific metrics
>>>> that
>>>>>> only applies to a subset of built-in processor nodes that
>>>>>> are
>>>> irrelevant to
>>>>>> alert-relevant metrics (just like suppression-emit (rate
>>>>>> | total)),
>>>> they'd
>>>>>> better be per-node instead of per-task and we would not
>>>>>> associate
>>> such
>>>>>> events with warning. With that in mind, I'd suggest we
>>>>>> consider
>>>> renaming
>>>>>> the metric without the `dropped` keyword to distinguish
>>>>>> it with the per-task level sensor. How about
>>>>>> "idempotent-update-skip (rate |
>>>> total)"?
>>>>>>
>>>>>> Also a minor suggestion: we should clarify in the KIP /
>>>>>> javadocs
>>> which
>>>>>> built-in processor nodes would have this metric while
>>>>>> others don't.
>>>>>>
>>>>>> 2) About stream time tracking, there are multiple known
>>>>>> issues that
>>> we
>>>>>> should close to improve our consistency semantics:
>>>>>>
>>>>>> a. preserve stream time of active tasks across rebalances
>>>>>> where
>>> they
>>>> may
>>>>>> be migrated. This is what KAFKA-9368
>>>>>> <https://issues.apache.org/jira/browse/KAFKA-9368> meant
>>>>>> for. b. preserve stream time of standby tasks to be
>>>>>> aligned with the
>>> active
>>>>>> tasks, via the changelog topics.
>>>>>>
>>>>>> And what I'm more concerning is b) here. For example:
>>>>>> let's say we
>>>> have a
>>>>>> topology of `source -> A -> repartition -> B` where both
>>>>>> A and B
>>> have
>>>>>> states along with changelogs, and both of them have
>>>>>> standbys. If a
>>>> record
>>>>>> is piped from the source and completed traversed through
>>>>>> the
>>> topology,
>>>> we
>>>>>> need to make sure that the stream time inferred across:
>>>>>>
>>>>>> * active task A (inferred from the source record), *
>>>>>> active task B (inferred from the derived record from
>>>>>> repartition
>>>> topic),
>>>>>> * standby task A (inferred from the changelog topic of
>>>>>> A's store), * standby task B (inferred from the changelog
>>>>>> topic of B's store)
>>>>>>
>>>>>> are consistent (note I'm not saying they should be
>>>>>> "exactly the
>>> same",
>>>> but
>>>>>> consistent, meaning that they may have different values
>>>>>> but as long
>>> as
>>>> that
>>>>>> does not impact the time-based queries, it is fine). The
>>>>>> main
>>>> motivation is
>>>>>> that on IQ, where both active and standby tasks could be
>>>>>> accessed,
>>> we
>>>> can
>>>>>> eventually improve our consistency guarantee to have 1)
>>>> read-your-write, 2)
>>>>>> consistency across stores, etc.
>>>>>>
>>>>>> I agree with John's assessment in the previous email, and
>>>>>> just to
>>>> clarify
>>>>>> more concretely what I'm thinking.
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 3, 2020 at 9:03 AM John Roesler
>>>>>> <vv...@apache.org>
>>>> wrote:
>>>>>>
>>>>>>> Thanks, Guozhang and Bruno!
>>>>>>>
>>>>>>> 2) I had a similar though to both of you about the
>>>>>>> metrics, but I
>>>> ultimately
>>>>>>> came out with a conclusion like Bruno's. These aren't
>>>>>>> dropped
>>> invalid
>>>>>>> records, they're intentionally dropped, valid, but
>>>>>>> unnecessary,
>>>> updates.
>>>>>>> A "warning" for this case definitely seems wrong, and
>>>>>>> I'd also not recommend counting these events along with
>>>>>>> "dropped-records", because those
>>> are
>>>>>>> all dropped invalid records, e.g., late or null-keyed
>>>>>>> or couldn't
>>> be
>>>>>>> deserialized.
>>>>>>>
>>>>>>> Like Bruno pointed out, an operator should be concerned
>>>>>>> to see non-zero "dropped-records", and would then
>>>>>>> consult the logs for
>>>> warnings.
>>>>>>> But that same person should be happy to see
>>>> "dropped-idempotent-updates"
>>>>>>> increasing, since it means they're saving time and
>>>>>>> money. Maybe
>>> the
>>>> name
>>>>>>> of the metric could be different, but I couldn't think
>>>>>>> of a better
>>>> one.
>>>>>>> OTOH, maybe it just stands out to us because we
>>>>>>> recently discussed those
>>>> other
>>>>>>> metrics in KIP-444?
>>>>>>>
>>>>>>> 1) Maybe we should discuss this point more. It seems
>>>>>>> like we should
>>>> maintain
>>>>>>> an invariant that the following three objects always
>>>>>>> have exactly
>>> the
>>>>>> same
>>>>>>> state (modulo flush boundaries): 1. The internal state
>>>>>>> store 2. The changelog 3. The operation's result view
>>>>>>>
>>>>>>> That is, if I have a materialized Filter, then it seems
>>>>>>> like I
>>> _must_
>>>>>> store
>>>>>>> exactly the same record in the store and the changelog,
>>>>>>> and also
>>>> forward
>>>>>>> the exact same record, including the timestamp, to the
>>>>>>> downstream operations.
>>>>>>>
>>>>>>> If we store something different in the internal state
>>>>>>> store than
>>> the
>>>>>>> changelog, we can get a situation where the state is
>>>>>>> actually
>>>> different
>>>>>>> after restoration than it is during processing, and
>>>>>>> queries against
>>>> standbys
>>>>>>> would return different results than queries against the
>>>>>>> active tasks.
>>>>>>>
>>>>>>> Regarding storing something different in the
>>>>>>> store+changelog than
>>> we
>>>>>>> forward downstream, consider the following topology:
>>>>>>> sourceTable .filter(someFilter, Materialized.as("f1"))
>>>>>>> .filter(_ -> true, Materialized.as("f2"))
>>>>>>>
>>>>>>> If we didn't forward exactly the same data we store,
>>>>>>> then
>>> querying f2
>>>>>>> would return different results than querying f1, which
>>>>>>> is clearly
>>> not
>>>>>>> correct, given the topology.
>>>>>>>
>>>>>>> It seems like maybe what you have in mind is the
>>>>>>> preservation of
>>>> stream
>>>>>>> time across restart/rebalance? This bug is still open,
>>>>>>> actually:
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9368 It
>>>>>>> seems like solving that bug would be independent of
>>>>>>> KIP-557.
>>> I.e.,
>>>>>>> KIP-557 neither makes that bug worse or better.
>>>>>>>
>>>>>>> One other thought I had is maybe you were thinking that
>>>>>>> operators would update their internally tracked stream
>>>>>>> time, but still
>>> discard
>>>>>>> records? I think that _would_ be a bug. That is, if a
>>>>>>> record gets
>>>>>> discarded
>>>>>>> as idempotent, it should have no effect at all on the
>>>>>>> state of the application. Reflecting on my prior
>>>>>>> analysis of stream time, most of the cases
>>>> where
>>>>>> we
>>>>>>> track stream time is in Stream aggregations, and in
>>>>>>> those cases,
>>> if
>>>> an
>>>>>>> incoming record's timestamp is higher than the previous
>>>>>>> stream
>>> time,
>>>> it
>>>>>>> would already not be considered idempotent. So we would
>>>>>>> store,
>>> log,
>>>> and
>>>>>>> forward the result with the new timestamp. The only
>>>>>>> other case is Suppress. With respect to idempotence,
>>>> Suppress is
>>>>>>> equivalent to a stateless no-op transformation. All it
>>>>>>> does is
>>>> collect
>>>>>> and
>>>>>>> delay updates. It has no memory of what it previously
>>>>>>> emitted, so it
>>>> wouldn't
>>>>>>> be possible for it to check for idempotence anyway.
>>>>>>>
>>>>>>> Was that what you were thinking? Thanks, -John
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
>>>>>>>> Hi Guozhang,
>>>>>>>>
>>>>>>>> I also had the same thought about using the existing
>>>> "dropped-records"
>>>>>>>> metrics. However, I think in this case it would be
>>>>>>>> better to
>>> use a
>>>> new
>>>>>>>> metric because dropped idempotent updates is an
>>>>>>>> optimization,
>>> they
>>>> do
>>>>>>>> not represent missed records. The dropped idempotent
>>>>>>>> updates in general do not change the result and so do
>>>>>>>> not need a warn log message. Whereas dropped records
>>>>>>>> due to expired windows,
>>>> serialization
>>>>>>>> errors, or lateness might be something concerning
>>>>>>>> that need a
>>> warn
>>>> log
>>>>>>>> message.
>>>>>>>>
>>>>>>>> Looking at the metrics, you would be happy to see
>>>>>>>> "dropped-idempotent-updates" increase, because that
>>>>>>>> means
>>> Streams
>>>> gets
>>>>>>>> rid of no-ops downstream, but you would be concerned
>>>>>>>> if "dropped-records" would increase, because that
>>>>>>>> means your
>>> records
>>>> or
>>>>>>>> the configuration of your app has issues. The
>>>>>>>> "dropped-idempotent-updates" metric could also be an
>>>>>>>> indication
>>>> that
>>>>>>>> you could further optimize your setup, by getting rid
>>>>>>>> of
>>> idempotent
>>>>>>>> updates further upstream.
>>>>>>>>
>>>>>>>> Best, Bruno
>>>>>>>>
>>>>>>>> On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <
>>> wangguoz@gmail.com>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hello Richard,
>>>>>>>>>
>>>>>>>>> Thanks for the KIP. I once reviewed it and was
>>>>>>>>> concerned about
>>>> its
>>>>>>> effects
>>>>>>>>> on stream time advancing. After reading the updated
>>>>>>>>> KIP I
>>> think
>>>> it
>>>>>> has
>>>>>>>>> answered a lot of them already.
>>>>>>>>>
>>>>>>>>> I have a couple minor comments still, otherwise I'm
>>>>>>>>> +1:
>>>>>>>>>
>>>>>>>>> 1) I want to clarify that for operations resulted
>>>>>>>>> in KTables
>>> (not
>>>>>> only
>>>>>>>>> aggregations, but consider KTable#filter that may
>>>>>>>>> also result
>>> in
>>>> a
>>>>>> new
>>>>>>>>> KTable), even if we drop emissions to the
>>>>>>>>> downstream topics we
>>>> would
>>>>>>> still
>>>>>>>>> append to the corresponding changelog if timestamp
>>>>>>>>> has
>>> changed.
>>>> This
>>>>>> is
>>>>>>>>> because the timestamps on the changelog is read by
>>>>>>>>> the standby
>>>> tasks
>>>>>>> which
>>>>>>>>> relies on them to infer its own stream time
>>>>>>>>> advancing.
>>>>>>>>>
>>>>>>>>> 2) About the metrics, in KIP-444 we are
>>>>>>>>> consolidating all
>>> types
>>>> of
>>>>>>>>> scenarios that can cause dropped records to the
>>>>>>>>> same metrics:
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi
t+on+change+support+for+Kafka+Streams
>>>>>>>>>
>>>>>>>>>
>>>
late-records-drop: INFO at processor node level, replaced by
>>> INFO
>>>>>>>>> task-level "dropped-records".
>>>>>>>>>
>>>>>>>>> skipped-records: INFO at thread and processor node
>>>>>>>>> level,
>>>> replaced by
>>>>>>> INFO
>>>>>>>>> task-level "dropped-records".
>>>>>>>>>
>>>>>>>>> expired-window-record-drop: DEBUG at state store
>>>>>>>>> level,
>>> replaced
>>>> by
>>>>>>> INFO
>>>>>>>>> task-level "dropped-records".
>>>>>>>>>
>>>>>>>>> The main idea is that instead of using different
>>>>>>>>> metrics to
>>>> indicate
>>>>>>>>> different types of scenarios, and users just alert
>>>>>>>>> on that
>>> single
>>>>>>> metrics.
>>>>>>>>> When alert triggers, they can look into the log4j
>>>>>>>>> for its
>>> causes
>>>> (we
>>>>>>> made
>>>>>>>>> sure that all sensor recordings of this metric
>>>>>>>>> would be
>>>> associated
>>>>>>> with a
>>>>>>>>> warning log4j).
>>>>>>>>>
>>>>>>>>> So I'd suggest that instead of introducing a new
>>>>>>>>> per-node "dropped-idempotent-updates", we just
>>>>>>>>> piggy-back on the
>>> existing
>>>>>>> task-level
>>>>>>>>> metric; unless we think that idempotent drops are
>>>>>>>>> more
>>> frequent
>>>> than
>>>>>>> others
>>>>>>>>> and also they do not worth a warning log, in that
>>>>>>>>> case we can
>>>>>> consider
>>>>>>>>> break this metric down with different tags for
>>>>>>>>> example.
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>> On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <
>>>>>> yohan.richard.yu@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> Thanks for the votes so far! @Matthias or
>>>>>>>>>> @Guozhang Wang <gu...@confluent.io> it
>>> would
>>>> be
>>>>>>> great to
>>>>>>>>>> also get your input on this KIP.
>>>>>>>>>>
>>>>>>>>>> It looks to be pretty close to completion, so the
>>>>>>>>>> finishing
>>>> touches
>>>>>>> are all
>>>>>>>>>> we need. :)
>>>>>>>>>>
>>>>>>>>>> Best, Richard
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine
>>>>>>>>>> < Ghassan.Yammine@bazaarvoice.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello all,
>>>>>>>>>>>
>>>>>>>>>>> +1 (non-binding)
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> Ghassan
>>>>>>>>>>>
>>>>>>>>>>> On 3/2/20, 12:43 PM, "Bruno Cadonna"
>>>>>>>>>>> <bruno@confluent.io
>>>>
>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> EXTERNAL: This email originated from outside
>>>>>>>>>>> of
>>>> Bazaarvoice.
>>>>>>> Do not
>>>>>>>>>>> click any links or open any attachments unless
>>>>>>>>>>> you trust
>>> the
>>>>>>> sender and
>>>>>>>>>>> know the content is safe.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Hi Richard,
>>>>>>>>>>>
>>>>>>>>>>> +1 (non-binding)
>>>>>>>>>>>
>>>>>>>>>>> Best, Bruno
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
>>>>>>> vvcephei@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Richard,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the KIP!
>>>>>>>>>>>>
>>>>>>>>>>>> I'm +1 (binding)
>>>>>>>>>>>>
>>>>>>>>>>>> -john
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Feb 27, 2020, at 14:40, Richard Yu
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am proposing a new optimization to Kafka
>>>>>>>>>>>>> Streams
>>>> which
>>>>>>> would
>>>>>>>>>>> greatly
>>>>>>>>>>>>> reduce the number of idempotent updates
>>>>>>>>>>>>> (or
>>> no-ops)
>>>> in
>>>>>> the
>>>>>>> Kafka
>>>>>>>>>>> Streams
>>>>>>>>>>>>> DAG. A number of users have been interested
>>>>>>>>>>>>> in this
>>>> feature,
>>>>>> so
>>>>>>> it
>>>>>>>>>>> would be nice
>>>>>>>>>>>>> to pass this one in.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For information, the KIP is described
>>>>>>>>>>>>> below:
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi
t+on+change+support+for+Kafka+Streams
>>>>>>>>>>>
>>>
>>
>>>>>>>>>>>>> We aim to make Kafka Streams more efficient
>>>>>>>>>>>>> by
>>>> adopting
>>>>>>> the "emit
>>>>>>>>>>> on
>>>>>>>>>>>>> change" reporting strategy.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please cast your vote!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best, Richard
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -- -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> -- -- Guozhang
>>>>>>
>>>>
>>>
>>>
>>> -- -- Guozhang
>>>
>>
>
-----BEGIN PGP SIGNATURE-----

iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kC1kACgkQO4miYXKq
/OhHKA/+OewqjX248vjk6GO6Ex/f2kOJuIIDGb4/c0NlTIS/Iyat1+S8N9P58KNP
pg133xwdWHagU7wajYMktoFiPamQ+Cv+PPhr7qz38JdfVAvzpNb8tcsI/wr5apOQ
XNlBsPhQBLtO/JQUve72OqY/TC9unbpBfhA4tvdA/qkLNvDaX542SrZdlwXuqTKH
EBpgEPBrwaqJ5S65KTMs6Fppc5c2V3dWOAC7Ssql30OneUd/RS88oQ07oNkwZwss
tADw+tzXtw8a0C0PGtMoXhLrs9wipEsuGOP8N6uvuQCM7YoIvTyeBf3Cu7jG8NFB
r2caoWY4TZkqCRsrKe37nNbR8KpjkNQBxCZ7nvIJ9B3KCdB0JOFQXwYj1+23z6aX
T1otQ+0ZIg5lzpIFiHCzwzO5mo2VUEYryRvanw/f2S/LaaBIcg83Dz5TJIv8dFcd
mU7Vu1KXtpWTgpg48JkWd9qSwPqBaR+nvbdP/DnStwf9/9n5SSGgcdS83jw/w6RV
N1bX6YlDCFYeIIT14lrsbWiHSZpiFARZ0fn+VBm8DAF0g+mWlX5Hg30yHKujDj+h
qMDZkI2K2eoYRJaUFcS3yvr2RqCtgXMCEr+jrAVGHDaq+Lt4mbEJRZdon3MiF0Ht
WmEiNaQa7Tu5h+8P5Rb05kPAB6ODa7/sC0BxC54uRXLdPnNxQCs=
=nuG1
-----END PGP SIGNATURE-----

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Richard Yu <yo...@gmail.com>.
Hi all,

I have decided to pass this KIP with 2 binding votes and 3 non-binding
votes (including mine).
I will update KIP status shortly after this.

Best,
Richard

On Thu, Mar 5, 2020 at 3:45 PM Richard Yu <yo...@gmail.com>
wrote:

> Hi all,
>
> Just polling for some last changes on the name.
> I think that since there doesn't seem to be much objection to any major
> changes in the KIP, I will pass it this Friday.
>
> If you feel that we still need some more discussion, please let me know. :)
>
> Best,
> Richard
>
> P.S. Will start working on a PR for this one soon.
>
> On Wed, Mar 4, 2020 at 1:30 PM Guozhang Wang <wa...@gmail.com> wrote:
>
>> Regarding the metric name, I was actually trying to be consistent with the
>> node-level `suppression-emit` as I feel this one's characteristics is
>> closer to that. I other folks feels better to align with the task-level
>> "dropped-records" I think I can be convinced too.
>>
>>
>> Guozhang
>>
>> On Wed, Mar 4, 2020 at 12:09 AM Bruno Cadonna <br...@confluent.io> wrote:
>>
>> > Hi all,
>> >
>> > may I make a non-binding proposal for the metric name? I would prefer
>> > "skipped-idempotent-updates" to be consistent with the
>> > "dropped-records".
>> >
>> > Best,
>> > Bruno
>> >
>> > On Tue, Mar 3, 2020 at 11:57 PM Richard Yu <yo...@gmail.com>
>> > wrote:
>> > >
>> > > Hi all,
>> > >
>> > > Thanks for the discussion!
>> > >
>> > > @Guozhang, I will make the corresponding changes to the KIP (i.e.
>> > renaming
>> > > the sensor and adding some notes).
>> > > With the current state of things, we are very close. Just need that
>> one
>> > > last binding vote.
>> > >
>> > > @Matthias J. Sax <ma...@confluent.io>  It would be ideal if we can
>> > also
>> > > get your last two cents on this as well.
>> > > Other than that, we are good.
>> > >
>> > > Best,
>> > > Richard
>> > >
>> > >
>> > > On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang <wa...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi Bruno, John:
>> > > >
>> > > > 1) That makes sense. If we consider them to be node-specific metrics
>> > that
>> > > > only applies to a subset of built-in processor nodes that are
>> > irrelevant to
>> > > > alert-relevant metrics (just like suppression-emit (rate | total)),
>> > they'd
>> > > > better be per-node instead of per-task and we would not associate
>> such
>> > > > events with warning. With that in mind, I'd suggest we consider
>> > renaming
>> > > > the metric without the `dropped` keyword to distinguish it with the
>> > > > per-task level sensor. How about "idempotent-update-skip (rate |
>> > total)"?
>> > > >
>> > > > Also a minor suggestion: we should clarify in the KIP / javadocs
>> which
>> > > > built-in processor nodes would have this metric while others don't.
>> > > >
>> > > > 2) About stream time tracking, there are multiple known issues that
>> we
>> > > > should close to improve our consistency semantics:
>> > > >
>> > > >  a. preserve stream time of active tasks across rebalances where
>> they
>> > may
>> > > > be migrated. This is what KAFKA-9368
>> > > > <https://issues.apache.org/jira/browse/KAFKA-9368> meant for.
>> > > >  b. preserve stream time of standby tasks to be aligned with the
>> active
>> > > > tasks, via the changelog topics.
>> > > >
>> > > > And what I'm more concerning is b) here. For example: let's say we
>> > have a
>> > > > topology of `source -> A -> repartition -> B` where both A and B
>> have
>> > > > states along with changelogs, and both of them have standbys. If a
>> > record
>> > > > is piped from the source and completed traversed through the
>> topology,
>> > we
>> > > > need to make sure that the stream time inferred across:
>> > > >
>> > > > * active task A (inferred from the source record),
>> > > > * active task B (inferred from the derived record from repartition
>> > topic),
>> > > > * standby task A (inferred from the changelog topic of A's store),
>> > > > * standby task B (inferred from the changelog topic of B's store)
>> > > >
>> > > > are consistent (note I'm not saying they should be "exactly the
>> same",
>> > but
>> > > > consistent, meaning that they may have different values but as long
>> as
>> > that
>> > > > does not impact the time-based queries, it is fine). The main
>> > motivation is
>> > > > that on IQ, where both active and standby tasks could be accessed,
>> we
>> > can
>> > > > eventually improve our consistency guarantee to have 1)
>> > read-your-write, 2)
>> > > > consistency across stores, etc.
>> > > >
>> > > > I agree with John's assessment in the previous email, and just to
>> > clarify
>> > > > more concretely what I'm thinking.
>> > > >
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > > On Tue, Mar 3, 2020 at 9:03 AM John Roesler <vv...@apache.org>
>> > wrote:
>> > > >
>> > > > > Thanks, Guozhang and Bruno!
>> > > > >
>> > > > > 2)
>> > > > > I had a similar though to both of you about the metrics, but I
>> > ultimately
>> > > > > came out with a conclusion like Bruno's. These aren't dropped
>> invalid
>> > > > > records, they're intentionally dropped, valid, but unnecessary,
>> > updates.
>> > > > > A "warning" for this case definitely seems wrong, and I'd also not
>> > > > > recommend
>> > > > > counting these events along with "dropped-records", because those
>> are
>> > > > > all dropped invalid records, e.g., late or null-keyed or couldn't
>> be
>> > > > > deserialized.
>> > > > >
>> > > > > Like Bruno pointed out, an operator should be concerned to see
>> > > > > non-zero "dropped-records", and would then consult the logs for
>> > warnings.
>> > > > > But that same person should be happy to see
>> > "dropped-idempotent-updates"
>> > > > > increasing, since it means they're saving time and money. Maybe
>> the
>> > name
>> > > > > of the metric could be different, but I couldn't think of a better
>> > one.
>> > > > > OTOH,
>> > > > > maybe it just stands out to us because we recently discussed those
>> > other
>> > > > > metrics in KIP-444?
>> > > > >
>> > > > > 1)
>> > > > > Maybe we should discuss this point more. It seems like we should
>> > maintain
>> > > > > an invariant that the following three objects always have exactly
>> the
>> > > > same
>> > > > > state (modulo flush boundaries):
>> > > > > 1. The internal state store
>> > > > > 2. The changelog
>> > > > > 3. The operation's result view
>> > > > >
>> > > > > That is, if I have a materialized Filter, then it seems like I
>> _must_
>> > > > store
>> > > > > exactly the same record in the store and the changelog, and also
>> > forward
>> > > > > the exact same record, including the timestamp, to the downstream
>> > > > > operations.
>> > > > >
>> > > > > If we store something different in the internal state store than
>> the
>> > > > > changelog, we can get a situation where the state is actually
>> > different
>> > > > > after
>> > > > > restoration than it is during processing, and queries against
>> > standbys
>> > > > > would
>> > > > > return different results than queries against the active tasks.
>> > > > >
>> > > > > Regarding storing something different in the store+changelog than
>> we
>> > > > > forward downstream, consider the following topology:
>> > > > > sourceTable
>> > > > >   .filter(someFilter, Materialized.as("f1"))
>> > > > >   .filter(_ -> true, Materialized.as("f2"))
>> > > > >
>> > > > > If we didn't forward exactly the same data we store, then
>> querying f2
>> > > > > would return different results than querying f1, which is clearly
>> not
>> > > > > correct, given the topology.
>> > > > >
>> > > > > It seems like maybe what you have in mind is the preservation of
>> > stream
>> > > > > time across restart/rebalance? This bug is still open, actually:
>> > > > > https://issues.apache.org/jira/browse/KAFKA-9368
>> > > > > It seems like solving that bug would be independent of KIP-557.
>> I.e.,
>> > > > > KIP-557 neither makes that bug worse or better.
>> > > > >
>> > > > > One other thought I had is maybe you were thinking that operators
>> > > > > would update their internally tracked stream time, but still
>> discard
>> > > > > records? I think that _would_ be a bug. That is, if a record gets
>> > > > discarded
>> > > > > as idempotent, it should have no effect at all on the state of the
>> > > > > application.
>> > > > > Reflecting on my prior analysis of stream time, most of the cases
>> > where
>> > > > we
>> > > > > track stream time is in Stream aggregations, and in those cases,
>> if
>> > an
>> > > > > incoming record's timestamp is higher than the previous stream
>> time,
>> > it
>> > > > > would already not be considered idempotent. So we would store,
>> log,
>> > and
>> > > > > forward the result with the new timestamp.
>> > > > > The only other case is Suppress. With respect to idempotence,
>> > Suppress is
>> > > > > equivalent to a stateless no-op transformation. All it does is
>> > collect
>> > > > and
>> > > > > delay
>> > > > > updates. It has no memory of what it previously emitted, so it
>> > wouldn't
>> > > > > be possible for it to check for idempotence anyway.
>> > > > >
>> > > > > Was that what you were thinking?
>> > > > > Thanks,
>> > > > > -John
>> > > > >
>> > > > >
>> > > > > On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
>> > > > > > Hi Guozhang,
>> > > > > >
>> > > > > > I also had the same thought about using the existing
>> > "dropped-records"
>> > > > > > metrics. However, I think in this case it would be better to
>> use a
>> > new
>> > > > > > metric because dropped idempotent updates is an optimization,
>> they
>> > do
>> > > > > > not represent missed records. The dropped idempotent updates in
>> > > > > > general do not change the result and so do not need a warn log
>> > > > > > message. Whereas dropped records due to expired windows,
>> > serialization
>> > > > > > errors, or lateness might be something concerning that need a
>> warn
>> > log
>> > > > > > message.
>> > > > > >
>> > > > > > Looking at the metrics, you would be happy to see
>> > > > > > "dropped-idempotent-updates" increase, because that means
>> Streams
>> > gets
>> > > > > > rid of no-ops downstream, but you would be concerned if
>> > > > > > "dropped-records" would increase, because that means your
>> records
>> > or
>> > > > > > the configuration of your app has issues. The
>> > > > > > "dropped-idempotent-updates" metric could also be an indication
>> > that
>> > > > > > you could further optimize your setup, by getting rid of
>> idempotent
>> > > > > > updates further upstream.
>> > > > > >
>> > > > > > Best,
>> > > > > > Bruno
>> > > > > >
>> > > > > > On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <
>> wangguoz@gmail.com>
>> > > > wrote:
>> > > > > > >
>> > > > > > > Hello Richard,
>> > > > > > >
>> > > > > > > Thanks for the KIP. I once reviewed it and was concerned about
>> > its
>> > > > > effects
>> > > > > > > on stream time advancing. After reading the updated KIP I
>> think
>> > it
>> > > > has
>> > > > > > > answered a lot of them already.
>> > > > > > >
>> > > > > > > I have a couple minor comments still, otherwise I'm +1:
>> > > > > > >
>> > > > > > > 1) I want to clarify that for operations resulted in KTables
>> (not
>> > > > only
>> > > > > > > aggregations, but consider KTable#filter that may also result
>> in
>> > a
>> > > > new
>> > > > > > > KTable), even if we drop emissions to the downstream topics we
>> > would
>> > > > > still
>> > > > > > > append to the corresponding changelog if timestamp has
>> changed.
>> > This
>> > > > is
>> > > > > > > because the timestamps on the changelog is read by the standby
>> > tasks
>> > > > > which
>> > > > > > > relies on them to infer its own stream time advancing.
>> > > > > > >
>> > > > > > > 2) About the metrics, in KIP-444 we are consolidating all
>> types
>> > of
>> > > > > > > scenarios that can cause dropped records to the same metrics:
>> > > > > > >
>> > > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
>> > > > > > >
>> > > > > > > late-records-drop: INFO at processor node level, replaced by
>> INFO
>> > > > > > > task-level "dropped-records".
>> > > > > > >
>> > > > > > > skipped-records: INFO at thread and processor node level,
>> > replaced by
>> > > > > INFO
>> > > > > > > task-level "dropped-records".
>> > > > > > >
>> > > > > > > expired-window-record-drop: DEBUG at state store level,
>> replaced
>> > by
>> > > > > INFO
>> > > > > > > task-level "dropped-records".
>> > > > > > >
>> > > > > > > The main idea is that instead of using different metrics to
>> > indicate
>> > > > > > > different types of scenarios, and users just alert on that
>> single
>> > > > > metrics.
>> > > > > > > When alert triggers, they can look into the log4j for its
>> causes
>> > (we
>> > > > > made
>> > > > > > > sure that all sensor recordings of this metric would be
>> > associated
>> > > > > with a
>> > > > > > > warning log4j).
>> > > > > > >
>> > > > > > > So I'd suggest that instead of introducing a new per-node
>> > > > > > > "dropped-idempotent-updates", we just piggy-back on the
>> existing
>> > > > > task-level
>> > > > > > > metric; unless we think that idempotent drops are more
>> frequent
>> > than
>> > > > > others
>> > > > > > > and also they do not worth a warning log, in that case we can
>> > > > consider
>> > > > > > > break this metric down with different tags for example.
>> > > > > > >
>> > > > > > > Guozhang
>> > > > > > >
>> > > > > > > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <
>> > > > yohan.richard.yu@gmail.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi all,
>> > > > > > > >
>> > > > > > > > Thanks for the votes so far!
>> > > > > > > > @Matthias or @Guozhang Wang <gu...@confluent.io> it
>> would
>> > be
>> > > > > great to
>> > > > > > > > also get your input on this KIP.
>> > > > > > > >
>> > > > > > > > It looks to be pretty close to completion, so the finishing
>> > touches
>> > > > > are all
>> > > > > > > > we need. :)
>> > > > > > > >
>> > > > > > > > Best,
>> > > > > > > > Richard
>> > > > > > > >
>> > > > > > > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
>> > > > > > > > Ghassan.Yammine@bazaarvoice.com> wrote:
>> > > > > > > >
>> > > > > > > > > Hello all,
>> > > > > > > > >
>> > > > > > > > > +1 (non-binding)
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > >
>> > > > > > > > > Ghassan
>> > > > > > > > >
>> > > > > > > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <bruno@confluent.io
>> >
>> > > > wrote:
>> > > > > > > > >
>> > > > > > > > >     EXTERNAL: This email originated from outside of
>> > Bazaarvoice.
>> > > > > Do not
>> > > > > > > > > click any links or open any attachments unless you trust
>> the
>> > > > > sender and
>> > > > > > > > > know the content is safe.
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >     Hi Richard,
>> > > > > > > > >
>> > > > > > > > >     +1 (non-binding)
>> > > > > > > > >
>> > > > > > > > >     Best,
>> > > > > > > > >     Bruno
>> > > > > > > > >
>> > > > > > > > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
>> > > > > vvcephei@apache.org>
>> > > > > > > > > wrote:
>> > > > > > > > >     >
>> > > > > > > > >     > Hi Richard,
>> > > > > > > > >     >
>> > > > > > > > >     > Thanks for the KIP!
>> > > > > > > > >     >
>> > > > > > > > >     > I'm +1 (binding)
>> > > > > > > > >     >
>> > > > > > > > >     > -john
>> > > > > > > > >     >
>> > > > > > > > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
>> > > > > > > > >     > > Hi all,
>> > > > > > > > >     > >
>> > > > > > > > >     > > I am proposing a new optimization to Kafka Streams
>> > which
>> > > > > would
>> > > > > > > > > greatly
>> > > > > > > > >     > > reduce the number of idempotent updates (or
>> no-ops)
>> > in
>> > > > the
>> > > > > Kafka
>> > > > > > > > > Streams
>> > > > > > > > >     > > DAG.
>> > > > > > > > >     > > A number of users have been interested in this
>> > feature,
>> > > > so
>> > > > > it
>> > > > > > > > > would be nice
>> > > > > > > > >     > > to pass this one in.
>> > > > > > > > >     > >
>> > > > > > > > >     > > For information, the KIP is described below:
>> > > > > > > > >     > >
>> > > > > > > > >
>> > > > > > > >
>> > > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
>> > > > > > > > >     > >
>> > > > > > > > >     > > We aim to make Kafka Streams more efficient by
>> > adopting
>> > > > > the "emit
>> > > > > > > > > on
>> > > > > > > > >     > > change" reporting strategy.
>> > > > > > > > >     > >
>> > > > > > > > >     > > Please cast your vote!
>> > > > > > > > >     > >
>> > > > > > > > >     > > Best,
>> > > > > > > > >     > > Richard
>> > > > > > > > >     > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > > -- Guozhang
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > > --
>> > > > -- Guozhang
>> > > >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Richard Yu <yo...@gmail.com>.
Hi all,

Just polling for some last changes on the name.
I think that since there doesn't seem to be much objection to any major
changes in the KIP, I will pass it this Friday.

If you feel that we still need some more discussion, please let me know. :)

Best,
Richard

P.S. Will start working on a PR for this one soon.

On Wed, Mar 4, 2020 at 1:30 PM Guozhang Wang <wa...@gmail.com> wrote:

> Regarding the metric name, I was actually trying to be consistent with the
> node-level `suppression-emit` as I feel this one's characteristics is
> closer to that. I other folks feels better to align with the task-level
> "dropped-records" I think I can be convinced too.
>
>
> Guozhang
>
> On Wed, Mar 4, 2020 at 12:09 AM Bruno Cadonna <br...@confluent.io> wrote:
>
> > Hi all,
> >
> > may I make a non-binding proposal for the metric name? I would prefer
> > "skipped-idempotent-updates" to be consistent with the
> > "dropped-records".
> >
> > Best,
> > Bruno
> >
> > On Tue, Mar 3, 2020 at 11:57 PM Richard Yu <yo...@gmail.com>
> > wrote:
> > >
> > > Hi all,
> > >
> > > Thanks for the discussion!
> > >
> > > @Guozhang, I will make the corresponding changes to the KIP (i.e.
> > renaming
> > > the sensor and adding some notes).
> > > With the current state of things, we are very close. Just need that one
> > > last binding vote.
> > >
> > > @Matthias J. Sax <ma...@confluent.io>  It would be ideal if we can
> > also
> > > get your last two cents on this as well.
> > > Other than that, we are good.
> > >
> > > Best,
> > > Richard
> > >
> > >
> > > On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hi Bruno, John:
> > > >
> > > > 1) That makes sense. If we consider them to be node-specific metrics
> > that
> > > > only applies to a subset of built-in processor nodes that are
> > irrelevant to
> > > > alert-relevant metrics (just like suppression-emit (rate | total)),
> > they'd
> > > > better be per-node instead of per-task and we would not associate
> such
> > > > events with warning. With that in mind, I'd suggest we consider
> > renaming
> > > > the metric without the `dropped` keyword to distinguish it with the
> > > > per-task level sensor. How about "idempotent-update-skip (rate |
> > total)"?
> > > >
> > > > Also a minor suggestion: we should clarify in the KIP / javadocs
> which
> > > > built-in processor nodes would have this metric while others don't.
> > > >
> > > > 2) About stream time tracking, there are multiple known issues that
> we
> > > > should close to improve our consistency semantics:
> > > >
> > > >  a. preserve stream time of active tasks across rebalances where they
> > may
> > > > be migrated. This is what KAFKA-9368
> > > > <https://issues.apache.org/jira/browse/KAFKA-9368> meant for.
> > > >  b. preserve stream time of standby tasks to be aligned with the
> active
> > > > tasks, via the changelog topics.
> > > >
> > > > And what I'm more concerning is b) here. For example: let's say we
> > have a
> > > > topology of `source -> A -> repartition -> B` where both A and B have
> > > > states along with changelogs, and both of them have standbys. If a
> > record
> > > > is piped from the source and completed traversed through the
> topology,
> > we
> > > > need to make sure that the stream time inferred across:
> > > >
> > > > * active task A (inferred from the source record),
> > > > * active task B (inferred from the derived record from repartition
> > topic),
> > > > * standby task A (inferred from the changelog topic of A's store),
> > > > * standby task B (inferred from the changelog topic of B's store)
> > > >
> > > > are consistent (note I'm not saying they should be "exactly the
> same",
> > but
> > > > consistent, meaning that they may have different values but as long
> as
> > that
> > > > does not impact the time-based queries, it is fine). The main
> > motivation is
> > > > that on IQ, where both active and standby tasks could be accessed, we
> > can
> > > > eventually improve our consistency guarantee to have 1)
> > read-your-write, 2)
> > > > consistency across stores, etc.
> > > >
> > > > I agree with John's assessment in the previous email, and just to
> > clarify
> > > > more concretely what I'm thinking.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Mar 3, 2020 at 9:03 AM John Roesler <vv...@apache.org>
> > wrote:
> > > >
> > > > > Thanks, Guozhang and Bruno!
> > > > >
> > > > > 2)
> > > > > I had a similar though to both of you about the metrics, but I
> > ultimately
> > > > > came out with a conclusion like Bruno's. These aren't dropped
> invalid
> > > > > records, they're intentionally dropped, valid, but unnecessary,
> > updates.
> > > > > A "warning" for this case definitely seems wrong, and I'd also not
> > > > > recommend
> > > > > counting these events along with "dropped-records", because those
> are
> > > > > all dropped invalid records, e.g., late or null-keyed or couldn't
> be
> > > > > deserialized.
> > > > >
> > > > > Like Bruno pointed out, an operator should be concerned to see
> > > > > non-zero "dropped-records", and would then consult the logs for
> > warnings.
> > > > > But that same person should be happy to see
> > "dropped-idempotent-updates"
> > > > > increasing, since it means they're saving time and money. Maybe the
> > name
> > > > > of the metric could be different, but I couldn't think of a better
> > one.
> > > > > OTOH,
> > > > > maybe it just stands out to us because we recently discussed those
> > other
> > > > > metrics in KIP-444?
> > > > >
> > > > > 1)
> > > > > Maybe we should discuss this point more. It seems like we should
> > maintain
> > > > > an invariant that the following three objects always have exactly
> the
> > > > same
> > > > > state (modulo flush boundaries):
> > > > > 1. The internal state store
> > > > > 2. The changelog
> > > > > 3. The operation's result view
> > > > >
> > > > > That is, if I have a materialized Filter, then it seems like I
> _must_
> > > > store
> > > > > exactly the same record in the store and the changelog, and also
> > forward
> > > > > the exact same record, including the timestamp, to the downstream
> > > > > operations.
> > > > >
> > > > > If we store something different in the internal state store than
> the
> > > > > changelog, we can get a situation where the state is actually
> > different
> > > > > after
> > > > > restoration than it is during processing, and queries against
> > standbys
> > > > > would
> > > > > return different results than queries against the active tasks.
> > > > >
> > > > > Regarding storing something different in the store+changelog than
> we
> > > > > forward downstream, consider the following topology:
> > > > > sourceTable
> > > > >   .filter(someFilter, Materialized.as("f1"))
> > > > >   .filter(_ -> true, Materialized.as("f2"))
> > > > >
> > > > > If we didn't forward exactly the same data we store, then querying
> f2
> > > > > would return different results than querying f1, which is clearly
> not
> > > > > correct, given the topology.
> > > > >
> > > > > It seems like maybe what you have in mind is the preservation of
> > stream
> > > > > time across restart/rebalance? This bug is still open, actually:
> > > > > https://issues.apache.org/jira/browse/KAFKA-9368
> > > > > It seems like solving that bug would be independent of KIP-557.
> I.e.,
> > > > > KIP-557 neither makes that bug worse or better.
> > > > >
> > > > > One other thought I had is maybe you were thinking that operators
> > > > > would update their internally tracked stream time, but still
> discard
> > > > > records? I think that _would_ be a bug. That is, if a record gets
> > > > discarded
> > > > > as idempotent, it should have no effect at all on the state of the
> > > > > application.
> > > > > Reflecting on my prior analysis of stream time, most of the cases
> > where
> > > > we
> > > > > track stream time is in Stream aggregations, and in those cases, if
> > an
> > > > > incoming record's timestamp is higher than the previous stream
> time,
> > it
> > > > > would already not be considered idempotent. So we would store, log,
> > and
> > > > > forward the result with the new timestamp.
> > > > > The only other case is Suppress. With respect to idempotence,
> > Suppress is
> > > > > equivalent to a stateless no-op transformation. All it does is
> > collect
> > > > and
> > > > > delay
> > > > > updates. It has no memory of what it previously emitted, so it
> > wouldn't
> > > > > be possible for it to check for idempotence anyway.
> > > > >
> > > > > Was that what you were thinking?
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > >
> > > > > On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > I also had the same thought about using the existing
> > "dropped-records"
> > > > > > metrics. However, I think in this case it would be better to use
> a
> > new
> > > > > > metric because dropped idempotent updates is an optimization,
> they
> > do
> > > > > > not represent missed records. The dropped idempotent updates in
> > > > > > general do not change the result and so do not need a warn log
> > > > > > message. Whereas dropped records due to expired windows,
> > serialization
> > > > > > errors, or lateness might be something concerning that need a
> warn
> > log
> > > > > > message.
> > > > > >
> > > > > > Looking at the metrics, you would be happy to see
> > > > > > "dropped-idempotent-updates" increase, because that means Streams
> > gets
> > > > > > rid of no-ops downstream, but you would be concerned if
> > > > > > "dropped-records" would increase, because that means your records
> > or
> > > > > > the configuration of your app has issues. The
> > > > > > "dropped-idempotent-updates" metric could also be an indication
> > that
> > > > > > you could further optimize your setup, by getting rid of
> idempotent
> > > > > > updates further upstream.
> > > > > >
> > > > > > Best,
> > > > > > Bruno
> > > > > >
> > > > > > On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > Hello Richard,
> > > > > > >
> > > > > > > Thanks for the KIP. I once reviewed it and was concerned about
> > its
> > > > > effects
> > > > > > > on stream time advancing. After reading the updated KIP I think
> > it
> > > > has
> > > > > > > answered a lot of them already.
> > > > > > >
> > > > > > > I have a couple minor comments still, otherwise I'm +1:
> > > > > > >
> > > > > > > 1) I want to clarify that for operations resulted in KTables
> (not
> > > > only
> > > > > > > aggregations, but consider KTable#filter that may also result
> in
> > a
> > > > new
> > > > > > > KTable), even if we drop emissions to the downstream topics we
> > would
> > > > > still
> > > > > > > append to the corresponding changelog if timestamp has changed.
> > This
> > > > is
> > > > > > > because the timestamps on the changelog is read by the standby
> > tasks
> > > > > which
> > > > > > > relies on them to infer its own stream time advancing.
> > > > > > >
> > > > > > > 2) About the metrics, in KIP-444 we are consolidating all types
> > of
> > > > > > > scenarios that can cause dropped records to the same metrics:
> > > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > > > > >
> > > > > > > late-records-drop: INFO at processor node level, replaced by
> INFO
> > > > > > > task-level "dropped-records".
> > > > > > >
> > > > > > > skipped-records: INFO at thread and processor node level,
> > replaced by
> > > > > INFO
> > > > > > > task-level "dropped-records".
> > > > > > >
> > > > > > > expired-window-record-drop: DEBUG at state store level,
> replaced
> > by
> > > > > INFO
> > > > > > > task-level "dropped-records".
> > > > > > >
> > > > > > > The main idea is that instead of using different metrics to
> > indicate
> > > > > > > different types of scenarios, and users just alert on that
> single
> > > > > metrics.
> > > > > > > When alert triggers, they can look into the log4j for its
> causes
> > (we
> > > > > made
> > > > > > > sure that all sensor recordings of this metric would be
> > associated
> > > > > with a
> > > > > > > warning log4j).
> > > > > > >
> > > > > > > So I'd suggest that instead of introducing a new per-node
> > > > > > > "dropped-idempotent-updates", we just piggy-back on the
> existing
> > > > > task-level
> > > > > > > metric; unless we think that idempotent drops are more frequent
> > than
> > > > > others
> > > > > > > and also they do not worth a warning log, in that case we can
> > > > consider
> > > > > > > break this metric down with different tags for example.
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <
> > > > yohan.richard.yu@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > Thanks for the votes so far!
> > > > > > > > @Matthias or @Guozhang Wang <gu...@confluent.io> it would
> > be
> > > > > great to
> > > > > > > > also get your input on this KIP.
> > > > > > > >
> > > > > > > > It looks to be pretty close to completion, so the finishing
> > touches
> > > > > are all
> > > > > > > > we need. :)
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Richard
> > > > > > > >
> > > > > > > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
> > > > > > > > Ghassan.Yammine@bazaarvoice.com> wrote:
> > > > > > > >
> > > > > > > > > Hello all,
> > > > > > > > >
> > > > > > > > > +1 (non-binding)
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Ghassan
> > > > > > > > >
> > > > > > > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io>
> > > > wrote:
> > > > > > > > >
> > > > > > > > >     EXTERNAL: This email originated from outside of
> > Bazaarvoice.
> > > > > Do not
> > > > > > > > > click any links or open any attachments unless you trust
> the
> > > > > sender and
> > > > > > > > > know the content is safe.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >     Hi Richard,
> > > > > > > > >
> > > > > > > > >     +1 (non-binding)
> > > > > > > > >
> > > > > > > > >     Best,
> > > > > > > > >     Bruno
> > > > > > > > >
> > > > > > > > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
> > > > > vvcephei@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >     >
> > > > > > > > >     > Hi Richard,
> > > > > > > > >     >
> > > > > > > > >     > Thanks for the KIP!
> > > > > > > > >     >
> > > > > > > > >     > I'm +1 (binding)
> > > > > > > > >     >
> > > > > > > > >     > -john
> > > > > > > > >     >
> > > > > > > > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> > > > > > > > >     > > Hi all,
> > > > > > > > >     > >
> > > > > > > > >     > > I am proposing a new optimization to Kafka Streams
> > which
> > > > > would
> > > > > > > > > greatly
> > > > > > > > >     > > reduce the number of idempotent updates (or no-ops)
> > in
> > > > the
> > > > > Kafka
> > > > > > > > > Streams
> > > > > > > > >     > > DAG.
> > > > > > > > >     > > A number of users have been interested in this
> > feature,
> > > > so
> > > > > it
> > > > > > > > > would be nice
> > > > > > > > >     > > to pass this one in.
> > > > > > > > >     > >
> > > > > > > > >     > > For information, the KIP is described below:
> > > > > > > > >     > >
> > > > > > > > >
> > > > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > > > > > > >     > >
> > > > > > > > >     > > We aim to make Kafka Streams more efficient by
> > adopting
> > > > > the "emit
> > > > > > > > > on
> > > > > > > > >     > > change" reporting strategy.
> > > > > > > > >     > >
> > > > > > > > >     > > Please cast your vote!
> > > > > > > > >     > >
> > > > > > > > >     > > Best,
> > > > > > > > >     > > Richard
> > > > > > > > >     > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> >
>
>
> --
> -- Guozhang
>

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Regarding the metric name, I was actually trying to be consistent with the
node-level `suppression-emit` as I feel this one's characteristics is
closer to that. I other folks feels better to align with the task-level
"dropped-records" I think I can be convinced too.


Guozhang

On Wed, Mar 4, 2020 at 12:09 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi all,
>
> may I make a non-binding proposal for the metric name? I would prefer
> "skipped-idempotent-updates" to be consistent with the
> "dropped-records".
>
> Best,
> Bruno
>
> On Tue, Mar 3, 2020 at 11:57 PM Richard Yu <yo...@gmail.com>
> wrote:
> >
> > Hi all,
> >
> > Thanks for the discussion!
> >
> > @Guozhang, I will make the corresponding changes to the KIP (i.e.
> renaming
> > the sensor and adding some notes).
> > With the current state of things, we are very close. Just need that one
> > last binding vote.
> >
> > @Matthias J. Sax <ma...@confluent.io>  It would be ideal if we can
> also
> > get your last two cents on this as well.
> > Other than that, we are good.
> >
> > Best,
> > Richard
> >
> >
> > On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hi Bruno, John:
> > >
> > > 1) That makes sense. If we consider them to be node-specific metrics
> that
> > > only applies to a subset of built-in processor nodes that are
> irrelevant to
> > > alert-relevant metrics (just like suppression-emit (rate | total)),
> they'd
> > > better be per-node instead of per-task and we would not associate such
> > > events with warning. With that in mind, I'd suggest we consider
> renaming
> > > the metric without the `dropped` keyword to distinguish it with the
> > > per-task level sensor. How about "idempotent-update-skip (rate |
> total)"?
> > >
> > > Also a minor suggestion: we should clarify in the KIP / javadocs which
> > > built-in processor nodes would have this metric while others don't.
> > >
> > > 2) About stream time tracking, there are multiple known issues that we
> > > should close to improve our consistency semantics:
> > >
> > >  a. preserve stream time of active tasks across rebalances where they
> may
> > > be migrated. This is what KAFKA-9368
> > > <https://issues.apache.org/jira/browse/KAFKA-9368> meant for.
> > >  b. preserve stream time of standby tasks to be aligned with the active
> > > tasks, via the changelog topics.
> > >
> > > And what I'm more concerning is b) here. For example: let's say we
> have a
> > > topology of `source -> A -> repartition -> B` where both A and B have
> > > states along with changelogs, and both of them have standbys. If a
> record
> > > is piped from the source and completed traversed through the topology,
> we
> > > need to make sure that the stream time inferred across:
> > >
> > > * active task A (inferred from the source record),
> > > * active task B (inferred from the derived record from repartition
> topic),
> > > * standby task A (inferred from the changelog topic of A's store),
> > > * standby task B (inferred from the changelog topic of B's store)
> > >
> > > are consistent (note I'm not saying they should be "exactly the same",
> but
> > > consistent, meaning that they may have different values but as long as
> that
> > > does not impact the time-based queries, it is fine). The main
> motivation is
> > > that on IQ, where both active and standby tasks could be accessed, we
> can
> > > eventually improve our consistency guarantee to have 1)
> read-your-write, 2)
> > > consistency across stores, etc.
> > >
> > > I agree with John's assessment in the previous email, and just to
> clarify
> > > more concretely what I'm thinking.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Mar 3, 2020 at 9:03 AM John Roesler <vv...@apache.org>
> wrote:
> > >
> > > > Thanks, Guozhang and Bruno!
> > > >
> > > > 2)
> > > > I had a similar though to both of you about the metrics, but I
> ultimately
> > > > came out with a conclusion like Bruno's. These aren't dropped invalid
> > > > records, they're intentionally dropped, valid, but unnecessary,
> updates.
> > > > A "warning" for this case definitely seems wrong, and I'd also not
> > > > recommend
> > > > counting these events along with "dropped-records", because those are
> > > > all dropped invalid records, e.g., late or null-keyed or couldn't be
> > > > deserialized.
> > > >
> > > > Like Bruno pointed out, an operator should be concerned to see
> > > > non-zero "dropped-records", and would then consult the logs for
> warnings.
> > > > But that same person should be happy to see
> "dropped-idempotent-updates"
> > > > increasing, since it means they're saving time and money. Maybe the
> name
> > > > of the metric could be different, but I couldn't think of a better
> one.
> > > > OTOH,
> > > > maybe it just stands out to us because we recently discussed those
> other
> > > > metrics in KIP-444?
> > > >
> > > > 1)
> > > > Maybe we should discuss this point more. It seems like we should
> maintain
> > > > an invariant that the following three objects always have exactly the
> > > same
> > > > state (modulo flush boundaries):
> > > > 1. The internal state store
> > > > 2. The changelog
> > > > 3. The operation's result view
> > > >
> > > > That is, if I have a materialized Filter, then it seems like I _must_
> > > store
> > > > exactly the same record in the store and the changelog, and also
> forward
> > > > the exact same record, including the timestamp, to the downstream
> > > > operations.
> > > >
> > > > If we store something different in the internal state store than the
> > > > changelog, we can get a situation where the state is actually
> different
> > > > after
> > > > restoration than it is during processing, and queries against
> standbys
> > > > would
> > > > return different results than queries against the active tasks.
> > > >
> > > > Regarding storing something different in the store+changelog than we
> > > > forward downstream, consider the following topology:
> > > > sourceTable
> > > >   .filter(someFilter, Materialized.as("f1"))
> > > >   .filter(_ -> true, Materialized.as("f2"))
> > > >
> > > > If we didn't forward exactly the same data we store, then querying f2
> > > > would return different results than querying f1, which is clearly not
> > > > correct, given the topology.
> > > >
> > > > It seems like maybe what you have in mind is the preservation of
> stream
> > > > time across restart/rebalance? This bug is still open, actually:
> > > > https://issues.apache.org/jira/browse/KAFKA-9368
> > > > It seems like solving that bug would be independent of KIP-557. I.e.,
> > > > KIP-557 neither makes that bug worse or better.
> > > >
> > > > One other thought I had is maybe you were thinking that operators
> > > > would update their internally tracked stream time, but still discard
> > > > records? I think that _would_ be a bug. That is, if a record gets
> > > discarded
> > > > as idempotent, it should have no effect at all on the state of the
> > > > application.
> > > > Reflecting on my prior analysis of stream time, most of the cases
> where
> > > we
> > > > track stream time is in Stream aggregations, and in those cases, if
> an
> > > > incoming record's timestamp is higher than the previous stream time,
> it
> > > > would already not be considered idempotent. So we would store, log,
> and
> > > > forward the result with the new timestamp.
> > > > The only other case is Suppress. With respect to idempotence,
> Suppress is
> > > > equivalent to a stateless no-op transformation. All it does is
> collect
> > > and
> > > > delay
> > > > updates. It has no memory of what it previously emitted, so it
> wouldn't
> > > > be possible for it to check for idempotence anyway.
> > > >
> > > > Was that what you were thinking?
> > > > Thanks,
> > > > -John
> > > >
> > > >
> > > > On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
> > > > > Hi Guozhang,
> > > > >
> > > > > I also had the same thought about using the existing
> "dropped-records"
> > > > > metrics. However, I think in this case it would be better to use a
> new
> > > > > metric because dropped idempotent updates is an optimization, they
> do
> > > > > not represent missed records. The dropped idempotent updates in
> > > > > general do not change the result and so do not need a warn log
> > > > > message. Whereas dropped records due to expired windows,
> serialization
> > > > > errors, or lateness might be something concerning that need a warn
> log
> > > > > message.
> > > > >
> > > > > Looking at the metrics, you would be happy to see
> > > > > "dropped-idempotent-updates" increase, because that means Streams
> gets
> > > > > rid of no-ops downstream, but you would be concerned if
> > > > > "dropped-records" would increase, because that means your records
> or
> > > > > the configuration of your app has issues. The
> > > > > "dropped-idempotent-updates" metric could also be an indication
> that
> > > > > you could further optimize your setup, by getting rid of idempotent
> > > > > updates further upstream.
> > > > >
> > > > > Best,
> > > > > Bruno
> > > > >
> > > > > On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > Hello Richard,
> > > > > >
> > > > > > Thanks for the KIP. I once reviewed it and was concerned about
> its
> > > > effects
> > > > > > on stream time advancing. After reading the updated KIP I think
> it
> > > has
> > > > > > answered a lot of them already.
> > > > > >
> > > > > > I have a couple minor comments still, otherwise I'm +1:
> > > > > >
> > > > > > 1) I want to clarify that for operations resulted in KTables (not
> > > only
> > > > > > aggregations, but consider KTable#filter that may also result in
> a
> > > new
> > > > > > KTable), even if we drop emissions to the downstream topics we
> would
> > > > still
> > > > > > append to the corresponding changelog if timestamp has changed.
> This
> > > is
> > > > > > because the timestamps on the changelog is read by the standby
> tasks
> > > > which
> > > > > > relies on them to infer its own stream time advancing.
> > > > > >
> > > > > > 2) About the metrics, in KIP-444 we are consolidating all types
> of
> > > > > > scenarios that can cause dropped records to the same metrics:
> > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > > > >
> > > > > > late-records-drop: INFO at processor node level, replaced by INFO
> > > > > > task-level "dropped-records".
> > > > > >
> > > > > > skipped-records: INFO at thread and processor node level,
> replaced by
> > > > INFO
> > > > > > task-level "dropped-records".
> > > > > >
> > > > > > expired-window-record-drop: DEBUG at state store level, replaced
> by
> > > > INFO
> > > > > > task-level "dropped-records".
> > > > > >
> > > > > > The main idea is that instead of using different metrics to
> indicate
> > > > > > different types of scenarios, and users just alert on that single
> > > > metrics.
> > > > > > When alert triggers, they can look into the log4j for its causes
> (we
> > > > made
> > > > > > sure that all sensor recordings of this metric would be
> associated
> > > > with a
> > > > > > warning log4j).
> > > > > >
> > > > > > So I'd suggest that instead of introducing a new per-node
> > > > > > "dropped-idempotent-updates", we just piggy-back on the existing
> > > > task-level
> > > > > > metric; unless we think that idempotent drops are more frequent
> than
> > > > others
> > > > > > and also they do not worth a warning log, in that case we can
> > > consider
> > > > > > break this metric down with different tags for example.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <
> > > yohan.richard.yu@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > Thanks for the votes so far!
> > > > > > > @Matthias or @Guozhang Wang <gu...@confluent.io> it would
> be
> > > > great to
> > > > > > > also get your input on this KIP.
> > > > > > >
> > > > > > > It looks to be pretty close to completion, so the finishing
> touches
> > > > are all
> > > > > > > we need. :)
> > > > > > >
> > > > > > > Best,
> > > > > > > Richard
> > > > > > >
> > > > > > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
> > > > > > > Ghassan.Yammine@bazaarvoice.com> wrote:
> > > > > > >
> > > > > > > > Hello all,
> > > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Ghassan
> > > > > > > >
> > > > > > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io>
> > > wrote:
> > > > > > > >
> > > > > > > >     EXTERNAL: This email originated from outside of
> Bazaarvoice.
> > > > Do not
> > > > > > > > click any links or open any attachments unless you trust the
> > > > sender and
> > > > > > > > know the content is safe.
> > > > > > > >
> > > > > > > >
> > > > > > > >     Hi Richard,
> > > > > > > >
> > > > > > > >     +1 (non-binding)
> > > > > > > >
> > > > > > > >     Best,
> > > > > > > >     Bruno
> > > > > > > >
> > > > > > > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
> > > > vvcephei@apache.org>
> > > > > > > > wrote:
> > > > > > > >     >
> > > > > > > >     > Hi Richard,
> > > > > > > >     >
> > > > > > > >     > Thanks for the KIP!
> > > > > > > >     >
> > > > > > > >     > I'm +1 (binding)
> > > > > > > >     >
> > > > > > > >     > -john
> > > > > > > >     >
> > > > > > > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> > > > > > > >     > > Hi all,
> > > > > > > >     > >
> > > > > > > >     > > I am proposing a new optimization to Kafka Streams
> which
> > > > would
> > > > > > > > greatly
> > > > > > > >     > > reduce the number of idempotent updates (or no-ops)
> in
> > > the
> > > > Kafka
> > > > > > > > Streams
> > > > > > > >     > > DAG.
> > > > > > > >     > > A number of users have been interested in this
> feature,
> > > so
> > > > it
> > > > > > > > would be nice
> > > > > > > >     > > to pass this one in.
> > > > > > > >     > >
> > > > > > > >     > > For information, the KIP is described below:
> > > > > > > >     > >
> > > > > > > >
> > > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > > > > > >     > >
> > > > > > > >     > > We aim to make Kafka Streams more efficient by
> adopting
> > > > the "emit
> > > > > > > > on
> > > > > > > >     > > change" reporting strategy.
> > > > > > > >     > >
> > > > > > > >     > > Please cast your vote!
> > > > > > > >     > >
> > > > > > > >     > > Best,
> > > > > > > >     > > Richard
> > > > > > > >     > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
>


-- 
-- Guozhang

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi all,

may I make a non-binding proposal for the metric name? I would prefer
"skipped-idempotent-updates" to be consistent with the
"dropped-records".

Best,
Bruno

On Tue, Mar 3, 2020 at 11:57 PM Richard Yu <yo...@gmail.com> wrote:
>
> Hi all,
>
> Thanks for the discussion!
>
> @Guozhang, I will make the corresponding changes to the KIP (i.e. renaming
> the sensor and adding some notes).
> With the current state of things, we are very close. Just need that one
> last binding vote.
>
> @Matthias J. Sax <ma...@confluent.io>  It would be ideal if we can also
> get your last two cents on this as well.
> Other than that, we are good.
>
> Best,
> Richard
>
>
> On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Bruno, John:
> >
> > 1) That makes sense. If we consider them to be node-specific metrics that
> > only applies to a subset of built-in processor nodes that are irrelevant to
> > alert-relevant metrics (just like suppression-emit (rate | total)), they'd
> > better be per-node instead of per-task and we would not associate such
> > events with warning. With that in mind, I'd suggest we consider renaming
> > the metric without the `dropped` keyword to distinguish it with the
> > per-task level sensor. How about "idempotent-update-skip (rate | total)"?
> >
> > Also a minor suggestion: we should clarify in the KIP / javadocs which
> > built-in processor nodes would have this metric while others don't.
> >
> > 2) About stream time tracking, there are multiple known issues that we
> > should close to improve our consistency semantics:
> >
> >  a. preserve stream time of active tasks across rebalances where they may
> > be migrated. This is what KAFKA-9368
> > <https://issues.apache.org/jira/browse/KAFKA-9368> meant for.
> >  b. preserve stream time of standby tasks to be aligned with the active
> > tasks, via the changelog topics.
> >
> > And what I'm more concerning is b) here. For example: let's say we have a
> > topology of `source -> A -> repartition -> B` where both A and B have
> > states along with changelogs, and both of them have standbys. If a record
> > is piped from the source and completed traversed through the topology, we
> > need to make sure that the stream time inferred across:
> >
> > * active task A (inferred from the source record),
> > * active task B (inferred from the derived record from repartition topic),
> > * standby task A (inferred from the changelog topic of A's store),
> > * standby task B (inferred from the changelog topic of B's store)
> >
> > are consistent (note I'm not saying they should be "exactly the same", but
> > consistent, meaning that they may have different values but as long as that
> > does not impact the time-based queries, it is fine). The main motivation is
> > that on IQ, where both active and standby tasks could be accessed, we can
> > eventually improve our consistency guarantee to have 1) read-your-write, 2)
> > consistency across stores, etc.
> >
> > I agree with John's assessment in the previous email, and just to clarify
> > more concretely what I'm thinking.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Mar 3, 2020 at 9:03 AM John Roesler <vv...@apache.org> wrote:
> >
> > > Thanks, Guozhang and Bruno!
> > >
> > > 2)
> > > I had a similar though to both of you about the metrics, but I ultimately
> > > came out with a conclusion like Bruno's. These aren't dropped invalid
> > > records, they're intentionally dropped, valid, but unnecessary, updates.
> > > A "warning" for this case definitely seems wrong, and I'd also not
> > > recommend
> > > counting these events along with "dropped-records", because those are
> > > all dropped invalid records, e.g., late or null-keyed or couldn't be
> > > deserialized.
> > >
> > > Like Bruno pointed out, an operator should be concerned to see
> > > non-zero "dropped-records", and would then consult the logs for warnings.
> > > But that same person should be happy to see "dropped-idempotent-updates"
> > > increasing, since it means they're saving time and money. Maybe the name
> > > of the metric could be different, but I couldn't think of a better one.
> > > OTOH,
> > > maybe it just stands out to us because we recently discussed those other
> > > metrics in KIP-444?
> > >
> > > 1)
> > > Maybe we should discuss this point more. It seems like we should maintain
> > > an invariant that the following three objects always have exactly the
> > same
> > > state (modulo flush boundaries):
> > > 1. The internal state store
> > > 2. The changelog
> > > 3. The operation's result view
> > >
> > > That is, if I have a materialized Filter, then it seems like I _must_
> > store
> > > exactly the same record in the store and the changelog, and also forward
> > > the exact same record, including the timestamp, to the downstream
> > > operations.
> > >
> > > If we store something different in the internal state store than the
> > > changelog, we can get a situation where the state is actually different
> > > after
> > > restoration than it is during processing, and queries against standbys
> > > would
> > > return different results than queries against the active tasks.
> > >
> > > Regarding storing something different in the store+changelog than we
> > > forward downstream, consider the following topology:
> > > sourceTable
> > >   .filter(someFilter, Materialized.as("f1"))
> > >   .filter(_ -> true, Materialized.as("f2"))
> > >
> > > If we didn't forward exactly the same data we store, then querying f2
> > > would return different results than querying f1, which is clearly not
> > > correct, given the topology.
> > >
> > > It seems like maybe what you have in mind is the preservation of stream
> > > time across restart/rebalance? This bug is still open, actually:
> > > https://issues.apache.org/jira/browse/KAFKA-9368
> > > It seems like solving that bug would be independent of KIP-557. I.e.,
> > > KIP-557 neither makes that bug worse or better.
> > >
> > > One other thought I had is maybe you were thinking that operators
> > > would update their internally tracked stream time, but still discard
> > > records? I think that _would_ be a bug. That is, if a record gets
> > discarded
> > > as idempotent, it should have no effect at all on the state of the
> > > application.
> > > Reflecting on my prior analysis of stream time, most of the cases where
> > we
> > > track stream time is in Stream aggregations, and in those cases, if an
> > > incoming record's timestamp is higher than the previous stream time, it
> > > would already not be considered idempotent. So we would store, log, and
> > > forward the result with the new timestamp.
> > > The only other case is Suppress. With respect to idempotence, Suppress is
> > > equivalent to a stateless no-op transformation. All it does is collect
> > and
> > > delay
> > > updates. It has no memory of what it previously emitted, so it wouldn't
> > > be possible for it to check for idempotence anyway.
> > >
> > > Was that what you were thinking?
> > > Thanks,
> > > -John
> > >
> > >
> > > On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
> > > > Hi Guozhang,
> > > >
> > > > I also had the same thought about using the existing "dropped-records"
> > > > metrics. However, I think in this case it would be better to use a new
> > > > metric because dropped idempotent updates is an optimization, they do
> > > > not represent missed records. The dropped idempotent updates in
> > > > general do not change the result and so do not need a warn log
> > > > message. Whereas dropped records due to expired windows, serialization
> > > > errors, or lateness might be something concerning that need a warn log
> > > > message.
> > > >
> > > > Looking at the metrics, you would be happy to see
> > > > "dropped-idempotent-updates" increase, because that means Streams gets
> > > > rid of no-ops downstream, but you would be concerned if
> > > > "dropped-records" would increase, because that means your records or
> > > > the configuration of your app has issues. The
> > > > "dropped-idempotent-updates" metric could also be an indication that
> > > > you could further optimize your setup, by getting rid of idempotent
> > > > updates further upstream.
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > > > >
> > > > > Hello Richard,
> > > > >
> > > > > Thanks for the KIP. I once reviewed it and was concerned about its
> > > effects
> > > > > on stream time advancing. After reading the updated KIP I think it
> > has
> > > > > answered a lot of them already.
> > > > >
> > > > > I have a couple minor comments still, otherwise I'm +1:
> > > > >
> > > > > 1) I want to clarify that for operations resulted in KTables (not
> > only
> > > > > aggregations, but consider KTable#filter that may also result in a
> > new
> > > > > KTable), even if we drop emissions to the downstream topics we would
> > > still
> > > > > append to the corresponding changelog if timestamp has changed. This
> > is
> > > > > because the timestamps on the changelog is read by the standby tasks
> > > which
> > > > > relies on them to infer its own stream time advancing.
> > > > >
> > > > > 2) About the metrics, in KIP-444 we are consolidating all types of
> > > > > scenarios that can cause dropped records to the same metrics:
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > > >
> > > > > late-records-drop: INFO at processor node level, replaced by INFO
> > > > > task-level "dropped-records".
> > > > >
> > > > > skipped-records: INFO at thread and processor node level, replaced by
> > > INFO
> > > > > task-level "dropped-records".
> > > > >
> > > > > expired-window-record-drop: DEBUG at state store level, replaced by
> > > INFO
> > > > > task-level "dropped-records".
> > > > >
> > > > > The main idea is that instead of using different metrics to indicate
> > > > > different types of scenarios, and users just alert on that single
> > > metrics.
> > > > > When alert triggers, they can look into the log4j for its causes (we
> > > made
> > > > > sure that all sensor recordings of this metric would be associated
> > > with a
> > > > > warning log4j).
> > > > >
> > > > > So I'd suggest that instead of introducing a new per-node
> > > > > "dropped-idempotent-updates", we just piggy-back on the existing
> > > task-level
> > > > > metric; unless we think that idempotent drops are more frequent than
> > > others
> > > > > and also they do not worth a warning log, in that case we can
> > consider
> > > > > break this metric down with different tags for example.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <
> > yohan.richard.yu@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Thanks for the votes so far!
> > > > > > @Matthias or @Guozhang Wang <gu...@confluent.io> it would be
> > > great to
> > > > > > also get your input on this KIP.
> > > > > >
> > > > > > It looks to be pretty close to completion, so the finishing touches
> > > are all
> > > > > > we need. :)
> > > > > >
> > > > > > Best,
> > > > > > Richard
> > > > > >
> > > > > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
> > > > > > Ghassan.Yammine@bazaarvoice.com> wrote:
> > > > > >
> > > > > > > Hello all,
> > > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Ghassan
> > > > > > >
> > > > > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io>
> > wrote:
> > > > > > >
> > > > > > >     EXTERNAL: This email originated from outside of Bazaarvoice.
> > > Do not
> > > > > > > click any links or open any attachments unless you trust the
> > > sender and
> > > > > > > know the content is safe.
> > > > > > >
> > > > > > >
> > > > > > >     Hi Richard,
> > > > > > >
> > > > > > >     +1 (non-binding)
> > > > > > >
> > > > > > >     Best,
> > > > > > >     Bruno
> > > > > > >
> > > > > > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
> > > vvcephei@apache.org>
> > > > > > > wrote:
> > > > > > >     >
> > > > > > >     > Hi Richard,
> > > > > > >     >
> > > > > > >     > Thanks for the KIP!
> > > > > > >     >
> > > > > > >     > I'm +1 (binding)
> > > > > > >     >
> > > > > > >     > -john
> > > > > > >     >
> > > > > > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> > > > > > >     > > Hi all,
> > > > > > >     > >
> > > > > > >     > > I am proposing a new optimization to Kafka Streams which
> > > would
> > > > > > > greatly
> > > > > > >     > > reduce the number of idempotent updates (or no-ops) in
> > the
> > > Kafka
> > > > > > > Streams
> > > > > > >     > > DAG.
> > > > > > >     > > A number of users have been interested in this feature,
> > so
> > > it
> > > > > > > would be nice
> > > > > > >     > > to pass this one in.
> > > > > > >     > >
> > > > > > >     > > For information, the KIP is described below:
> > > > > > >     > >
> > > > > > >
> > > > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > > > > >     > >
> > > > > > >     > > We aim to make Kafka Streams more efficient by adopting
> > > the "emit
> > > > > > > on
> > > > > > >     > > change" reporting strategy.
> > > > > > >     > >
> > > > > > >     > > Please cast your vote!
> > > > > > >     > >
> > > > > > >     > > Best,
> > > > > > >     > > Richard
> > > > > > >     > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Richard Yu <yo...@gmail.com>.
Hi all,

Thanks for the discussion!

@Guozhang, I will make the corresponding changes to the KIP (i.e. renaming
the sensor and adding some notes).
With the current state of things, we are very close. Just need that one
last binding vote.

@Matthias J. Sax <ma...@confluent.io>  It would be ideal if we can also
get your last two cents on this as well.
Other than that, we are good.

Best,
Richard


On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Bruno, John:
>
> 1) That makes sense. If we consider them to be node-specific metrics that
> only applies to a subset of built-in processor nodes that are irrelevant to
> alert-relevant metrics (just like suppression-emit (rate | total)), they'd
> better be per-node instead of per-task and we would not associate such
> events with warning. With that in mind, I'd suggest we consider renaming
> the metric without the `dropped` keyword to distinguish it with the
> per-task level sensor. How about "idempotent-update-skip (rate | total)"?
>
> Also a minor suggestion: we should clarify in the KIP / javadocs which
> built-in processor nodes would have this metric while others don't.
>
> 2) About stream time tracking, there are multiple known issues that we
> should close to improve our consistency semantics:
>
>  a. preserve stream time of active tasks across rebalances where they may
> be migrated. This is what KAFKA-9368
> <https://issues.apache.org/jira/browse/KAFKA-9368> meant for.
>  b. preserve stream time of standby tasks to be aligned with the active
> tasks, via the changelog topics.
>
> And what I'm more concerning is b) here. For example: let's say we have a
> topology of `source -> A -> repartition -> B` where both A and B have
> states along with changelogs, and both of them have standbys. If a record
> is piped from the source and completed traversed through the topology, we
> need to make sure that the stream time inferred across:
>
> * active task A (inferred from the source record),
> * active task B (inferred from the derived record from repartition topic),
> * standby task A (inferred from the changelog topic of A's store),
> * standby task B (inferred from the changelog topic of B's store)
>
> are consistent (note I'm not saying they should be "exactly the same", but
> consistent, meaning that they may have different values but as long as that
> does not impact the time-based queries, it is fine). The main motivation is
> that on IQ, where both active and standby tasks could be accessed, we can
> eventually improve our consistency guarantee to have 1) read-your-write, 2)
> consistency across stores, etc.
>
> I agree with John's assessment in the previous email, and just to clarify
> more concretely what I'm thinking.
>
>
> Guozhang
>
>
> On Tue, Mar 3, 2020 at 9:03 AM John Roesler <vv...@apache.org> wrote:
>
> > Thanks, Guozhang and Bruno!
> >
> > 2)
> > I had a similar though to both of you about the metrics, but I ultimately
> > came out with a conclusion like Bruno's. These aren't dropped invalid
> > records, they're intentionally dropped, valid, but unnecessary, updates.
> > A "warning" for this case definitely seems wrong, and I'd also not
> > recommend
> > counting these events along with "dropped-records", because those are
> > all dropped invalid records, e.g., late or null-keyed or couldn't be
> > deserialized.
> >
> > Like Bruno pointed out, an operator should be concerned to see
> > non-zero "dropped-records", and would then consult the logs for warnings.
> > But that same person should be happy to see "dropped-idempotent-updates"
> > increasing, since it means they're saving time and money. Maybe the name
> > of the metric could be different, but I couldn't think of a better one.
> > OTOH,
> > maybe it just stands out to us because we recently discussed those other
> > metrics in KIP-444?
> >
> > 1)
> > Maybe we should discuss this point more. It seems like we should maintain
> > an invariant that the following three objects always have exactly the
> same
> > state (modulo flush boundaries):
> > 1. The internal state store
> > 2. The changelog
> > 3. The operation's result view
> >
> > That is, if I have a materialized Filter, then it seems like I _must_
> store
> > exactly the same record in the store and the changelog, and also forward
> > the exact same record, including the timestamp, to the downstream
> > operations.
> >
> > If we store something different in the internal state store than the
> > changelog, we can get a situation where the state is actually different
> > after
> > restoration than it is during processing, and queries against standbys
> > would
> > return different results than queries against the active tasks.
> >
> > Regarding storing something different in the store+changelog than we
> > forward downstream, consider the following topology:
> > sourceTable
> >   .filter(someFilter, Materialized.as("f1"))
> >   .filter(_ -> true, Materialized.as("f2"))
> >
> > If we didn't forward exactly the same data we store, then querying f2
> > would return different results than querying f1, which is clearly not
> > correct, given the topology.
> >
> > It seems like maybe what you have in mind is the preservation of stream
> > time across restart/rebalance? This bug is still open, actually:
> > https://issues.apache.org/jira/browse/KAFKA-9368
> > It seems like solving that bug would be independent of KIP-557. I.e.,
> > KIP-557 neither makes that bug worse or better.
> >
> > One other thought I had is maybe you were thinking that operators
> > would update their internally tracked stream time, but still discard
> > records? I think that _would_ be a bug. That is, if a record gets
> discarded
> > as idempotent, it should have no effect at all on the state of the
> > application.
> > Reflecting on my prior analysis of stream time, most of the cases where
> we
> > track stream time is in Stream aggregations, and in those cases, if an
> > incoming record's timestamp is higher than the previous stream time, it
> > would already not be considered idempotent. So we would store, log, and
> > forward the result with the new timestamp.
> > The only other case is Suppress. With respect to idempotence, Suppress is
> > equivalent to a stateless no-op transformation. All it does is collect
> and
> > delay
> > updates. It has no memory of what it previously emitted, so it wouldn't
> > be possible for it to check for idempotence anyway.
> >
> > Was that what you were thinking?
> > Thanks,
> > -John
> >
> >
> > On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
> > > Hi Guozhang,
> > >
> > > I also had the same thought about using the existing "dropped-records"
> > > metrics. However, I think in this case it would be better to use a new
> > > metric because dropped idempotent updates is an optimization, they do
> > > not represent missed records. The dropped idempotent updates in
> > > general do not change the result and so do not need a warn log
> > > message. Whereas dropped records due to expired windows, serialization
> > > errors, or lateness might be something concerning that need a warn log
> > > message.
> > >
> > > Looking at the metrics, you would be happy to see
> > > "dropped-idempotent-updates" increase, because that means Streams gets
> > > rid of no-ops downstream, but you would be concerned if
> > > "dropped-records" would increase, because that means your records or
> > > the configuration of your app has issues. The
> > > "dropped-idempotent-updates" metric could also be an indication that
> > > you could further optimize your setup, by getting rid of idempotent
> > > updates further upstream.
> > >
> > > Best,
> > > Bruno
> > >
> > > On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> > > >
> > > > Hello Richard,
> > > >
> > > > Thanks for the KIP. I once reviewed it and was concerned about its
> > effects
> > > > on stream time advancing. After reading the updated KIP I think it
> has
> > > > answered a lot of them already.
> > > >
> > > > I have a couple minor comments still, otherwise I'm +1:
> > > >
> > > > 1) I want to clarify that for operations resulted in KTables (not
> only
> > > > aggregations, but consider KTable#filter that may also result in a
> new
> > > > KTable), even if we drop emissions to the downstream topics we would
> > still
> > > > append to the corresponding changelog if timestamp has changed. This
> is
> > > > because the timestamps on the changelog is read by the standby tasks
> > which
> > > > relies on them to infer its own stream time advancing.
> > > >
> > > > 2) About the metrics, in KIP-444 we are consolidating all types of
> > > > scenarios that can cause dropped records to the same metrics:
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > >
> > > > late-records-drop: INFO at processor node level, replaced by INFO
> > > > task-level "dropped-records".
> > > >
> > > > skipped-records: INFO at thread and processor node level, replaced by
> > INFO
> > > > task-level "dropped-records".
> > > >
> > > > expired-window-record-drop: DEBUG at state store level, replaced by
> > INFO
> > > > task-level "dropped-records".
> > > >
> > > > The main idea is that instead of using different metrics to indicate
> > > > different types of scenarios, and users just alert on that single
> > metrics.
> > > > When alert triggers, they can look into the log4j for its causes (we
> > made
> > > > sure that all sensor recordings of this metric would be associated
> > with a
> > > > warning log4j).
> > > >
> > > > So I'd suggest that instead of introducing a new per-node
> > > > "dropped-idempotent-updates", we just piggy-back on the existing
> > task-level
> > > > metric; unless we think that idempotent drops are more frequent than
> > others
> > > > and also they do not worth a warning log, in that case we can
> consider
> > > > break this metric down with different tags for example.
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <
> yohan.richard.yu@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Thanks for the votes so far!
> > > > > @Matthias or @Guozhang Wang <gu...@confluent.io> it would be
> > great to
> > > > > also get your input on this KIP.
> > > > >
> > > > > It looks to be pretty close to completion, so the finishing touches
> > are all
> > > > > we need. :)
> > > > >
> > > > > Best,
> > > > > Richard
> > > > >
> > > > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
> > > > > Ghassan.Yammine@bazaarvoice.com> wrote:
> > > > >
> > > > > > Hello all,
> > > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Ghassan
> > > > > >
> > > > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io>
> wrote:
> > > > > >
> > > > > >     EXTERNAL: This email originated from outside of Bazaarvoice.
> > Do not
> > > > > > click any links or open any attachments unless you trust the
> > sender and
> > > > > > know the content is safe.
> > > > > >
> > > > > >
> > > > > >     Hi Richard,
> > > > > >
> > > > > >     +1 (non-binding)
> > > > > >
> > > > > >     Best,
> > > > > >     Bruno
> > > > > >
> > > > > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
> > vvcephei@apache.org>
> > > > > > wrote:
> > > > > >     >
> > > > > >     > Hi Richard,
> > > > > >     >
> > > > > >     > Thanks for the KIP!
> > > > > >     >
> > > > > >     > I'm +1 (binding)
> > > > > >     >
> > > > > >     > -john
> > > > > >     >
> > > > > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> > > > > >     > > Hi all,
> > > > > >     > >
> > > > > >     > > I am proposing a new optimization to Kafka Streams which
> > would
> > > > > > greatly
> > > > > >     > > reduce the number of idempotent updates (or no-ops) in
> the
> > Kafka
> > > > > > Streams
> > > > > >     > > DAG.
> > > > > >     > > A number of users have been interested in this feature,
> so
> > it
> > > > > > would be nice
> > > > > >     > > to pass this one in.
> > > > > >     > >
> > > > > >     > > For information, the KIP is described below:
> > > > > >     > >
> > > > > >
> > > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > > > >     > >
> > > > > >     > > We aim to make Kafka Streams more efficient by adopting
> > the "emit
> > > > > > on
> > > > > >     > > change" reporting strategy.
> > > > > >     > >
> > > > > >     > > Please cast your vote!
> > > > > >     > >
> > > > > >     > > Best,
> > > > > >     > > Richard
> > > > > >     > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Bruno, John:

1) That makes sense. If we consider them to be node-specific metrics that
only applies to a subset of built-in processor nodes that are irrelevant to
alert-relevant metrics (just like suppression-emit (rate | total)), they'd
better be per-node instead of per-task and we would not associate such
events with warning. With that in mind, I'd suggest we consider renaming
the metric without the `dropped` keyword to distinguish it with the
per-task level sensor. How about "idempotent-update-skip (rate | total)"?

Also a minor suggestion: we should clarify in the KIP / javadocs which
built-in processor nodes would have this metric while others don't.

2) About stream time tracking, there are multiple known issues that we
should close to improve our consistency semantics:

 a. preserve stream time of active tasks across rebalances where they may
be migrated. This is what KAFKA-9368
<https://issues.apache.org/jira/browse/KAFKA-9368> meant for.
 b. preserve stream time of standby tasks to be aligned with the active
tasks, via the changelog topics.

And what I'm more concerning is b) here. For example: let's say we have a
topology of `source -> A -> repartition -> B` where both A and B have
states along with changelogs, and both of them have standbys. If a record
is piped from the source and completed traversed through the topology, we
need to make sure that the stream time inferred across:

* active task A (inferred from the source record),
* active task B (inferred from the derived record from repartition topic),
* standby task A (inferred from the changelog topic of A's store),
* standby task B (inferred from the changelog topic of B's store)

are consistent (note I'm not saying they should be "exactly the same", but
consistent, meaning that they may have different values but as long as that
does not impact the time-based queries, it is fine). The main motivation is
that on IQ, where both active and standby tasks could be accessed, we can
eventually improve our consistency guarantee to have 1) read-your-write, 2)
consistency across stores, etc.

I agree with John's assessment in the previous email, and just to clarify
more concretely what I'm thinking.


Guozhang


On Tue, Mar 3, 2020 at 9:03 AM John Roesler <vv...@apache.org> wrote:

> Thanks, Guozhang and Bruno!
>
> 2)
> I had a similar though to both of you about the metrics, but I ultimately
> came out with a conclusion like Bruno's. These aren't dropped invalid
> records, they're intentionally dropped, valid, but unnecessary, updates.
> A "warning" for this case definitely seems wrong, and I'd also not
> recommend
> counting these events along with "dropped-records", because those are
> all dropped invalid records, e.g., late or null-keyed or couldn't be
> deserialized.
>
> Like Bruno pointed out, an operator should be concerned to see
> non-zero "dropped-records", and would then consult the logs for warnings.
> But that same person should be happy to see "dropped-idempotent-updates"
> increasing, since it means they're saving time and money. Maybe the name
> of the metric could be different, but I couldn't think of a better one.
> OTOH,
> maybe it just stands out to us because we recently discussed those other
> metrics in KIP-444?
>
> 1)
> Maybe we should discuss this point more. It seems like we should maintain
> an invariant that the following three objects always have exactly the same
> state (modulo flush boundaries):
> 1. The internal state store
> 2. The changelog
> 3. The operation's result view
>
> That is, if I have a materialized Filter, then it seems like I _must_ store
> exactly the same record in the store and the changelog, and also forward
> the exact same record, including the timestamp, to the downstream
> operations.
>
> If we store something different in the internal state store than the
> changelog, we can get a situation where the state is actually different
> after
> restoration than it is during processing, and queries against standbys
> would
> return different results than queries against the active tasks.
>
> Regarding storing something different in the store+changelog than we
> forward downstream, consider the following topology:
> sourceTable
>   .filter(someFilter, Materialized.as("f1"))
>   .filter(_ -> true, Materialized.as("f2"))
>
> If we didn't forward exactly the same data we store, then querying f2
> would return different results than querying f1, which is clearly not
> correct, given the topology.
>
> It seems like maybe what you have in mind is the preservation of stream
> time across restart/rebalance? This bug is still open, actually:
> https://issues.apache.org/jira/browse/KAFKA-9368
> It seems like solving that bug would be independent of KIP-557. I.e.,
> KIP-557 neither makes that bug worse or better.
>
> One other thought I had is maybe you were thinking that operators
> would update their internally tracked stream time, but still discard
> records? I think that _would_ be a bug. That is, if a record gets discarded
> as idempotent, it should have no effect at all on the state of the
> application.
> Reflecting on my prior analysis of stream time, most of the cases where we
> track stream time is in Stream aggregations, and in those cases, if an
> incoming record's timestamp is higher than the previous stream time, it
> would already not be considered idempotent. So we would store, log, and
> forward the result with the new timestamp.
> The only other case is Suppress. With respect to idempotence, Suppress is
> equivalent to a stateless no-op transformation. All it does is collect and
> delay
> updates. It has no memory of what it previously emitted, so it wouldn't
> be possible for it to check for idempotence anyway.
>
> Was that what you were thinking?
> Thanks,
> -John
>
>
> On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
> > Hi Guozhang,
> >
> > I also had the same thought about using the existing "dropped-records"
> > metrics. However, I think in this case it would be better to use a new
> > metric because dropped idempotent updates is an optimization, they do
> > not represent missed records. The dropped idempotent updates in
> > general do not change the result and so do not need a warn log
> > message. Whereas dropped records due to expired windows, serialization
> > errors, or lateness might be something concerning that need a warn log
> > message.
> >
> > Looking at the metrics, you would be happy to see
> > "dropped-idempotent-updates" increase, because that means Streams gets
> > rid of no-ops downstream, but you would be concerned if
> > "dropped-records" would increase, because that means your records or
> > the configuration of your app has issues. The
> > "dropped-idempotent-updates" metric could also be an indication that
> > you could further optimize your setup, by getting rid of idempotent
> > updates further upstream.
> >
> > Best,
> > Bruno
> >
> > On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <wa...@gmail.com> wrote:
> > >
> > > Hello Richard,
> > >
> > > Thanks for the KIP. I once reviewed it and was concerned about its
> effects
> > > on stream time advancing. After reading the updated KIP I think it has
> > > answered a lot of them already.
> > >
> > > I have a couple minor comments still, otherwise I'm +1:
> > >
> > > 1) I want to clarify that for operations resulted in KTables (not only
> > > aggregations, but consider KTable#filter that may also result in a new
> > > KTable), even if we drop emissions to the downstream topics we would
> still
> > > append to the corresponding changelog if timestamp has changed. This is
> > > because the timestamps on the changelog is read by the standby tasks
> which
> > > relies on them to infer its own stream time advancing.
> > >
> > > 2) About the metrics, in KIP-444 we are consolidating all types of
> > > scenarios that can cause dropped records to the same metrics:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > >
> > > late-records-drop: INFO at processor node level, replaced by INFO
> > > task-level "dropped-records".
> > >
> > > skipped-records: INFO at thread and processor node level, replaced by
> INFO
> > > task-level "dropped-records".
> > >
> > > expired-window-record-drop: DEBUG at state store level, replaced by
> INFO
> > > task-level "dropped-records".
> > >
> > > The main idea is that instead of using different metrics to indicate
> > > different types of scenarios, and users just alert on that single
> metrics.
> > > When alert triggers, they can look into the log4j for its causes (we
> made
> > > sure that all sensor recordings of this metric would be associated
> with a
> > > warning log4j).
> > >
> > > So I'd suggest that instead of introducing a new per-node
> > > "dropped-idempotent-updates", we just piggy-back on the existing
> task-level
> > > metric; unless we think that idempotent drops are more frequent than
> others
> > > and also they do not worth a warning log, in that case we can consider
> > > break this metric down with different tags for example.
> > >
> > > Guozhang
> > >
> > > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <yo...@gmail.com>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > Thanks for the votes so far!
> > > > @Matthias or @Guozhang Wang <gu...@confluent.io> it would be
> great to
> > > > also get your input on this KIP.
> > > >
> > > > It looks to be pretty close to completion, so the finishing touches
> are all
> > > > we need. :)
> > > >
> > > > Best,
> > > > Richard
> > > >
> > > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
> > > > Ghassan.Yammine@bazaarvoice.com> wrote:
> > > >
> > > > > Hello all,
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Ghassan
> > > > >
> > > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io> wrote:
> > > > >
> > > > >     EXTERNAL: This email originated from outside of Bazaarvoice.
> Do not
> > > > > click any links or open any attachments unless you trust the
> sender and
> > > > > know the content is safe.
> > > > >
> > > > >
> > > > >     Hi Richard,
> > > > >
> > > > >     +1 (non-binding)
> > > > >
> > > > >     Best,
> > > > >     Bruno
> > > > >
> > > > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <
> vvcephei@apache.org>
> > > > > wrote:
> > > > >     >
> > > > >     > Hi Richard,
> > > > >     >
> > > > >     > Thanks for the KIP!
> > > > >     >
> > > > >     > I'm +1 (binding)
> > > > >     >
> > > > >     > -john
> > > > >     >
> > > > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> > > > >     > > Hi all,
> > > > >     > >
> > > > >     > > I am proposing a new optimization to Kafka Streams which
> would
> > > > > greatly
> > > > >     > > reduce the number of idempotent updates (or no-ops) in the
> Kafka
> > > > > Streams
> > > > >     > > DAG.
> > > > >     > > A number of users have been interested in this feature, so
> it
> > > > > would be nice
> > > > >     > > to pass this one in.
> > > > >     > >
> > > > >     > > For information, the KIP is described below:
> > > > >     > >
> > > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > > >     > >
> > > > >     > > We aim to make Kafka Streams more efficient by adopting
> the "emit
> > > > > on
> > > > >     > > change" reporting strategy.
> > > > >     > >
> > > > >     > > Please cast your vote!
> > > > >     > >
> > > > >     > > Best,
> > > > >     > > Richard
> > > > >     > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
>


-- 
-- Guozhang

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by John Roesler <vv...@apache.org>.
Thanks, Guozhang and Bruno!

2)
I had a similar though to both of you about the metrics, but I ultimately
came out with a conclusion like Bruno's. These aren't dropped invalid
records, they're intentionally dropped, valid, but unnecessary, updates.
A "warning" for this case definitely seems wrong, and I'd also not recommend
counting these events along with "dropped-records", because those are
all dropped invalid records, e.g., late or null-keyed or couldn't be deserialized.

Like Bruno pointed out, an operator should be concerned to see
non-zero "dropped-records", and would then consult the logs for warnings.
But that same person should be happy to see "dropped-idempotent-updates"
increasing, since it means they're saving time and money. Maybe the name
of the metric could be different, but I couldn't think of a better one. OTOH,
maybe it just stands out to us because we recently discussed those other
metrics in KIP-444?

1)
Maybe we should discuss this point more. It seems like we should maintain
an invariant that the following three objects always have exactly the same
state (modulo flush boundaries):
1. The internal state store
2. The changelog
3. The operation's result view

That is, if I have a materialized Filter, then it seems like I _must_ store
exactly the same record in the store and the changelog, and also forward
the exact same record, including the timestamp, to the downstream
operations. 

If we store something different in the internal state store than the 
changelog, we can get a situation where the state is actually different after
restoration than it is during processing, and queries against standbys would
return different results than queries against the active tasks.

Regarding storing something different in the store+changelog than we
forward downstream, consider the following topology:
sourceTable
  .filter(someFilter, Materialized.as("f1"))
  .filter(_ -> true, Materialized.as("f2"))

If we didn't forward exactly the same data we store, then querying f2
would return different results than querying f1, which is clearly not
correct, given the topology.

It seems like maybe what you have in mind is the preservation of stream
time across restart/rebalance? This bug is still open, actually:
https://issues.apache.org/jira/browse/KAFKA-9368
It seems like solving that bug would be independent of KIP-557. I.e.,
KIP-557 neither makes that bug worse or better.

One other thought I had is maybe you were thinking that operators
would update their internally tracked stream time, but still discard
records? I think that _would_ be a bug. That is, if a record gets discarded
as idempotent, it should have no effect at all on the state of the application.
Reflecting on my prior analysis of stream time, most of the cases where we
track stream time is in Stream aggregations, and in those cases, if an
incoming record's timestamp is higher than the previous stream time, it
would already not be considered idempotent. So we would store, log, and
forward the result with the new timestamp.
The only other case is Suppress. With respect to idempotence, Suppress is
equivalent to a stateless no-op transformation. All it does is collect and delay
updates. It has no memory of what it previously emitted, so it wouldn't 
be possible for it to check for idempotence anyway.

Was that what you were thinking?
Thanks,
-John


On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote:
> Hi Guozhang,
> 
> I also had the same thought about using the existing "dropped-records"
> metrics. However, I think in this case it would be better to use a new
> metric because dropped idempotent updates is an optimization, they do
> not represent missed records. The dropped idempotent updates in
> general do not change the result and so do not need a warn log
> message. Whereas dropped records due to expired windows, serialization
> errors, or lateness might be something concerning that need a warn log
> message.
> 
> Looking at the metrics, you would be happy to see
> "dropped-idempotent-updates" increase, because that means Streams gets
> rid of no-ops downstream, but you would be concerned if
> "dropped-records" would increase, because that means your records or
> the configuration of your app has issues. The
> "dropped-idempotent-updates" metric could also be an indication that
> you could further optimize your setup, by getting rid of idempotent
> updates further upstream.
> 
> Best,
> Bruno
> 
> On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > Hello Richard,
> >
> > Thanks for the KIP. I once reviewed it and was concerned about its effects
> > on stream time advancing. After reading the updated KIP I think it has
> > answered a lot of them already.
> >
> > I have a couple minor comments still, otherwise I'm +1:
> >
> > 1) I want to clarify that for operations resulted in KTables (not only
> > aggregations, but consider KTable#filter that may also result in a new
> > KTable), even if we drop emissions to the downstream topics we would still
> > append to the corresponding changelog if timestamp has changed. This is
> > because the timestamps on the changelog is read by the standby tasks which
> > relies on them to infer its own stream time advancing.
> >
> > 2) About the metrics, in KIP-444 we are consolidating all types of
> > scenarios that can cause dropped records to the same metrics:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> >
> > late-records-drop: INFO at processor node level, replaced by INFO
> > task-level "dropped-records".
> >
> > skipped-records: INFO at thread and processor node level, replaced by INFO
> > task-level "dropped-records".
> >
> > expired-window-record-drop: DEBUG at state store level, replaced by INFO
> > task-level "dropped-records".
> >
> > The main idea is that instead of using different metrics to indicate
> > different types of scenarios, and users just alert on that single metrics.
> > When alert triggers, they can look into the log4j for its causes (we made
> > sure that all sensor recordings of this metric would be associated with a
> > warning log4j).
> >
> > So I'd suggest that instead of introducing a new per-node
> > "dropped-idempotent-updates", we just piggy-back on the existing task-level
> > metric; unless we think that idempotent drops are more frequent than others
> > and also they do not worth a warning log, in that case we can consider
> > break this metric down with different tags for example.
> >
> > Guozhang
> >
> > On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <yo...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > Thanks for the votes so far!
> > > @Matthias or @Guozhang Wang <gu...@confluent.io> it would be great to
> > > also get your input on this KIP.
> > >
> > > It looks to be pretty close to completion, so the finishing touches are all
> > > we need. :)
> > >
> > > Best,
> > > Richard
> > >
> > > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
> > > Ghassan.Yammine@bazaarvoice.com> wrote:
> > >
> > > > Hello all,
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Thanks,
> > > >
> > > > Ghassan
> > > >
> > > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io> wrote:
> > > >
> > > >     EXTERNAL: This email originated from outside of Bazaarvoice. Do not
> > > > click any links or open any attachments unless you trust the sender and
> > > > know the content is safe.
> > > >
> > > >
> > > >     Hi Richard,
> > > >
> > > >     +1 (non-binding)
> > > >
> > > >     Best,
> > > >     Bruno
> > > >
> > > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <vv...@apache.org>
> > > > wrote:
> > > >     >
> > > >     > Hi Richard,
> > > >     >
> > > >     > Thanks for the KIP!
> > > >     >
> > > >     > I'm +1 (binding)
> > > >     >
> > > >     > -john
> > > >     >
> > > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> > > >     > > Hi all,
> > > >     > >
> > > >     > > I am proposing a new optimization to Kafka Streams which would
> > > > greatly
> > > >     > > reduce the number of idempotent updates (or no-ops) in the Kafka
> > > > Streams
> > > >     > > DAG.
> > > >     > > A number of users have been interested in this feature, so it
> > > > would be nice
> > > >     > > to pass this one in.
> > > >     > >
> > > >     > > For information, the KIP is described below:
> > > >     > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > > >     > >
> > > >     > > We aim to make Kafka Streams more efficient by adopting the "emit
> > > > on
> > > >     > > change" reporting strategy.
> > > >     > >
> > > >     > > Please cast your vote!
> > > >     > >
> > > >     > > Best,
> > > >     > > Richard
> > > >     > >
> > > >
> > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
>

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Guozhang,

I also had the same thought about using the existing "dropped-records"
metrics. However, I think in this case it would be better to use a new
metric because dropped idempotent updates is an optimization, they do
not represent missed records. The dropped idempotent updates in
general do not change the result and so do not need a warn log
message. Whereas dropped records due to expired windows, serialization
errors, or lateness might be something concerning that need a warn log
message.

Looking at the metrics, you would be happy to see
"dropped-idempotent-updates" increase, because that means Streams gets
rid of no-ops downstream, but you would be concerned if
"dropped-records" would increase, because that means your records or
the configuration of your app has issues. The
"dropped-idempotent-updates" metric could also be an indication that
you could further optimize your setup, by getting rid of idempotent
updates further upstream.

Best,
Bruno

On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> Hello Richard,
>
> Thanks for the KIP. I once reviewed it and was concerned about its effects
> on stream time advancing. After reading the updated KIP I think it has
> answered a lot of them already.
>
> I have a couple minor comments still, otherwise I'm +1:
>
> 1) I want to clarify that for operations resulted in KTables (not only
> aggregations, but consider KTable#filter that may also result in a new
> KTable), even if we drop emissions to the downstream topics we would still
> append to the corresponding changelog if timestamp has changed. This is
> because the timestamps on the changelog is read by the standby tasks which
> relies on them to infer its own stream time advancing.
>
> 2) About the metrics, in KIP-444 we are consolidating all types of
> scenarios that can cause dropped records to the same metrics:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
>
> late-records-drop: INFO at processor node level, replaced by INFO
> task-level "dropped-records".
>
> skipped-records: INFO at thread and processor node level, replaced by INFO
> task-level "dropped-records".
>
> expired-window-record-drop: DEBUG at state store level, replaced by INFO
> task-level "dropped-records".
>
> The main idea is that instead of using different metrics to indicate
> different types of scenarios, and users just alert on that single metrics.
> When alert triggers, they can look into the log4j for its causes (we made
> sure that all sensor recordings of this metric would be associated with a
> warning log4j).
>
> So I'd suggest that instead of introducing a new per-node
> "dropped-idempotent-updates", we just piggy-back on the existing task-level
> metric; unless we think that idempotent drops are more frequent than others
> and also they do not worth a warning log, in that case we can consider
> break this metric down with different tags for example.
>
> Guozhang
>
> On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <yo...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > Thanks for the votes so far!
> > @Matthias or @Guozhang Wang <gu...@confluent.io> it would be great to
> > also get your input on this KIP.
> >
> > It looks to be pretty close to completion, so the finishing touches are all
> > we need. :)
> >
> > Best,
> > Richard
> >
> > On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
> > Ghassan.Yammine@bazaarvoice.com> wrote:
> >
> > > Hello all,
> > >
> > > +1 (non-binding)
> > >
> > > Thanks,
> > >
> > > Ghassan
> > >
> > > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io> wrote:
> > >
> > >     EXTERNAL: This email originated from outside of Bazaarvoice. Do not
> > > click any links or open any attachments unless you trust the sender and
> > > know the content is safe.
> > >
> > >
> > >     Hi Richard,
> > >
> > >     +1 (non-binding)
> > >
> > >     Best,
> > >     Bruno
> > >
> > >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <vv...@apache.org>
> > > wrote:
> > >     >
> > >     > Hi Richard,
> > >     >
> > >     > Thanks for the KIP!
> > >     >
> > >     > I'm +1 (binding)
> > >     >
> > >     > -john
> > >     >
> > >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> > >     > > Hi all,
> > >     > >
> > >     > > I am proposing a new optimization to Kafka Streams which would
> > > greatly
> > >     > > reduce the number of idempotent updates (or no-ops) in the Kafka
> > > Streams
> > >     > > DAG.
> > >     > > A number of users have been interested in this feature, so it
> > > would be nice
> > >     > > to pass this one in.
> > >     > >
> > >     > > For information, the KIP is described below:
> > >     > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> > >     > >
> > >     > > We aim to make Kafka Streams more efficient by adopting the "emit
> > > on
> > >     > > change" reporting strategy.
> > >     > >
> > >     > > Please cast your vote!
> > >     > >
> > >     > > Best,
> > >     > > Richard
> > >     > >
> > >
> > >
> > >
> >
>
>
> --
> -- Guozhang

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Richard,

Thanks for the KIP. I once reviewed it and was concerned about its effects
on stream time advancing. After reading the updated KIP I think it has
answered a lot of them already.

I have a couple minor comments still, otherwise I'm +1:

1) I want to clarify that for operations resulted in KTables (not only
aggregations, but consider KTable#filter that may also result in a new
KTable), even if we drop emissions to the downstream topics we would still
append to the corresponding changelog if timestamp has changed. This is
because the timestamps on the changelog is read by the standby tasks which
relies on them to infer its own stream time advancing.

2) About the metrics, in KIP-444 we are consolidating all types of
scenarios that can cause dropped records to the same metrics:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams

late-records-drop: INFO at processor node level, replaced by INFO
task-level "dropped-records".

skipped-records: INFO at thread and processor node level, replaced by INFO
task-level "dropped-records".

expired-window-record-drop: DEBUG at state store level, replaced by INFO
task-level "dropped-records".

The main idea is that instead of using different metrics to indicate
different types of scenarios, and users just alert on that single metrics.
When alert triggers, they can look into the log4j for its causes (we made
sure that all sensor recordings of this metric would be associated with a
warning log4j).

So I'd suggest that instead of introducing a new per-node
"dropped-idempotent-updates", we just piggy-back on the existing task-level
metric; unless we think that idempotent drops are more frequent than others
and also they do not worth a warning log, in that case we can consider
break this metric down with different tags for example.

Guozhang

On Mon, Mar 2, 2020 at 1:59 PM Richard Yu <yo...@gmail.com>
wrote:

> Hi all,
>
> Thanks for the votes so far!
> @Matthias or @Guozhang Wang <gu...@confluent.io> it would be great to
> also get your input on this KIP.
>
> It looks to be pretty close to completion, so the finishing touches are all
> we need. :)
>
> Best,
> Richard
>
> On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
> Ghassan.Yammine@bazaarvoice.com> wrote:
>
> > Hello all,
> >
> > +1 (non-binding)
> >
> > Thanks,
> >
> > Ghassan
> >
> > On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io> wrote:
> >
> >     EXTERNAL: This email originated from outside of Bazaarvoice. Do not
> > click any links or open any attachments unless you trust the sender and
> > know the content is safe.
> >
> >
> >     Hi Richard,
> >
> >     +1 (non-binding)
> >
> >     Best,
> >     Bruno
> >
> >     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <vv...@apache.org>
> > wrote:
> >     >
> >     > Hi Richard,
> >     >
> >     > Thanks for the KIP!
> >     >
> >     > I'm +1 (binding)
> >     >
> >     > -john
> >     >
> >     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> >     > > Hi all,
> >     > >
> >     > > I am proposing a new optimization to Kafka Streams which would
> > greatly
> >     > > reduce the number of idempotent updates (or no-ops) in the Kafka
> > Streams
> >     > > DAG.
> >     > > A number of users have been interested in this feature, so it
> > would be nice
> >     > > to pass this one in.
> >     > >
> >     > > For information, the KIP is described below:
> >     > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> >     > >
> >     > > We aim to make Kafka Streams more efficient by adopting the "emit
> > on
> >     > > change" reporting strategy.
> >     > >
> >     > > Please cast your vote!
> >     > >
> >     > > Best,
> >     > > Richard
> >     > >
> >
> >
> >
>


-- 
-- Guozhang

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Richard Yu <yo...@gmail.com>.
Hi all,

Thanks for the votes so far!
@Matthias or @Guozhang Wang <gu...@confluent.io> it would be great to
also get your input on this KIP.

It looks to be pretty close to completion, so the finishing touches are all
we need. :)

Best,
Richard

On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine <
Ghassan.Yammine@bazaarvoice.com> wrote:

> Hello all,
>
> +1 (non-binding)
>
> Thanks,
>
> Ghassan
>
> On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io> wrote:
>
>     EXTERNAL: This email originated from outside of Bazaarvoice. Do not
> click any links or open any attachments unless you trust the sender and
> know the content is safe.
>
>
>     Hi Richard,
>
>     +1 (non-binding)
>
>     Best,
>     Bruno
>
>     On Mon, Mar 2, 2020 at 4:33 PM John Roesler <vv...@apache.org>
> wrote:
>     >
>     > Hi Richard,
>     >
>     > Thanks for the KIP!
>     >
>     > I'm +1 (binding)
>     >
>     > -john
>     >
>     > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
>     > > Hi all,
>     > >
>     > > I am proposing a new optimization to Kafka Streams which would
> greatly
>     > > reduce the number of idempotent updates (or no-ops) in the Kafka
> Streams
>     > > DAG.
>     > > A number of users have been interested in this feature, so it
> would be nice
>     > > to pass this one in.
>     > >
>     > > For information, the KIP is described below:
>     > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
>     > >
>     > > We aim to make Kafka Streams more efficient by adopting the "emit
> on
>     > > change" reporting strategy.
>     > >
>     > > Please cast your vote!
>     > >
>     > > Best,
>     > > Richard
>     > >
>
>
>

Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Ghassan Yammine <Gh...@bazaarvoice.com>.
Hello all,

+1 (non-binding)

Thanks,

Ghassan

On 3/2/20, 12:43 PM, "Bruno Cadonna" <br...@confluent.io> wrote:

    EXTERNAL: This email originated from outside of Bazaarvoice. Do not click any links or open any attachments unless you trust the sender and know the content is safe.
    
    
    Hi Richard,
    
    +1 (non-binding)
    
    Best,
    Bruno
    
    On Mon, Mar 2, 2020 at 4:33 PM John Roesler <vv...@apache.org> wrote:
    >
    > Hi Richard,
    >
    > Thanks for the KIP!
    >
    > I'm +1 (binding)
    >
    > -john
    >
    > On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
    > > Hi all,
    > >
    > > I am proposing a new optimization to Kafka Streams which would greatly
    > > reduce the number of idempotent updates (or no-ops) in the Kafka Streams
    > > DAG.
    > > A number of users have been interested in this feature, so it would be nice
    > > to pass this one in.
    > >
    > > For information, the KIP is described below:
    > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
    > >
    > > We aim to make Kafka Streams more efficient by adopting the "emit on
    > > change" reporting strategy.
    > >
    > > Please cast your vote!
    > >
    > > Best,
    > > Richard
    > >
    


Re: [VOTE] KIP-557: Add emit on change support for Kafka Streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Richard,

+1 (non-binding)

Best,
Bruno

On Mon, Mar 2, 2020 at 4:33 PM John Roesler <vv...@apache.org> wrote:
>
> Hi Richard,
>
> Thanks for the KIP!
>
> I'm +1 (binding)
>
> -john
>
> On Thu, Feb 27, 2020, at 14:40, Richard Yu wrote:
> > Hi all,
> >
> > I am proposing a new optimization to Kafka Streams which would greatly
> > reduce the number of idempotent updates (or no-ops) in the Kafka Streams
> > DAG.
> > A number of users have been interested in this feature, so it would be nice
> > to pass this one in.
> >
> > For information, the KIP is described below:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> >
> > We aim to make Kafka Streams more efficient by adopting the "emit on
> > change" reporting strategy.
> >
> > Please cast your vote!
> >
> > Best,
> > Richard
> >