You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2019/06/27 16:11:18 UTC

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Hello folks,

As 2.3 is released now, I'd like to bump up this KIP discussion again for
your reviews.


Guozhang


On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Patrik,
>
> Since we are rolling out 2.3 and everyone is busy with the release now
> this KIP does not have much discussion involved yet and will slip into the
> next release cadence.
>
> This KIP itself contains several parts itself: 1. refactoring the existing
> metrics hierarchy to cleanup some redundancy and also get more clarity; 2.
> add instance-level metrics like rebalance and state metrics, as well as
> other static metrics.
>
>
> Guozhang
>
>
>
> On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <pk...@gmail.com> wrote:
>
>> Hi Guozhang
>> Thanks for the KIP, this looks very helpful.
>> Could you please provide more detail on the metrics planned for the state?
>> We were just considering how to implement this ourselves because we need
>> to
>> track the history of stage changes.
>> The idea was to have an accumulated "seconds in state x" metric for every
>> state.
>> The new rebalance metric might solve part of our use case, but it is
>> interesting what you have planned for the state metric.
>> best regards
>> Patrik
>>
>> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <wa...@gmail.com> wrote:
>>
>> > Hello folks,
>> >
>> > I'd like to propose the following KIP to improve the Kafka Streams
>> metrics
>> > mechanism to users. This includes 1) a minor change in the public
>> > StreamsMetrics API, and 2) a major cleanup on the Streams' own built-in
>> > metrics hierarchy.
>> >
>> > Details can be found here:
>> >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
>> >
>> > I'd love to hear your thoughts and feedbacks. Thanks!
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Works for me.

(Btw: I did not really vote, just stated my overall support :P )

On 9/5/19 2:50 PM, John Roesler wrote:
> Thanks, all.
> 
> FWIW, the most recent formulation from Guozhang + Bruno's addendum would
> have my support.
> 
> Thanks,
> -John
> 
> On Thu, Sep 5, 2019 at 4:05 PM Bruno Cadonna <br...@confluent.io> wrote:
> 
>> Hi Guozhang,
>>
>> Your summary corresponds to my proposal.
>>
>> A new value would only be added if in future we change the metrics in
>> a backward-incompatible way, i.e., 2.4-<the version before the new
>> breaking change>. "latest" will always stay the default.
>>
>> Best,
>> Bruno
>>
>> On Thu, Sep 5, 2019 at 10:57 PM Guozhang Wang <wa...@gmail.com> wrote:
>>>
>>> Hello Bruno,
>>>
>>> I think your concern makes sense, let's adopt this suggestion in KIP-444
>>> instead. Just to clarify:
>>>
>>> 1. The default value would be "latest".
>>> 2. The only other valid value is "0.10.0-2.3".
>>>
>>> And moving forward this config may stay without any new values.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Thu, Sep 5, 2019 at 12:16 PM Bruno Cadonna <br...@confluent.io>
>> wrote:
>>>
>>>> Hi Guozhang,
>>>>
>>>> I think user experience and code maintenance are tightly related. The
>>>> harder to maintain the code the worse the user experience will get.
>>>>
>>>> Making the config optional does not solve the issue. Wouldn't users be
>>>> puzzled when we release 2.5 and they cannot set
>>>> built.in.metrics.version to 2.4 to be sure to get the same metrics for
>>>> that version? It seems with that solution we would just move
>>>> maintenance to the next release.
>>>>
>>>> I cannot see how the values `latest` and `0.10.0-2.3` would lead to a
>>>> bad user experience.
>>>>
>>>> Regarding testing, at least on integration test level, we absolutely
>>>> need to test all versions. It is too easy to make a mistake with so
>>>> many versions. Remember that on integration test level we need to
>>>> start an embedded Kafka for each single test.
>>>>
>>>> Best,
>>>> Bruno
>>>>
>>>> On Thu, Sep 5, 2019 at 8:46 PM Guozhang Wang <wa...@gmail.com>
>> wrote:
>>>>>
>>>>> Hi Bruno,
>>>>>
>>>>> Thanks for raising this point. I think the main motivation behind
>> this
>>>>> proposal is, like Matthias said, to ease the understanding burden
>> from
>>>>> users to our own shoulders. Testing wise, I think we do not
>> necessarily
>>>>> need to explode the testing matrix but just test the last version
>> before
>>>>> each metrics refactoring (again, hopefully it is the only time) and
>>>> hence I
>>>>> think it worth benefiting user's experience. WDYT?
>>>>>
>>>>> Hi Matthias,
>>>>>
>>>>> Thanks for your feedback, I will update the wiki page accordingly.
>>>>>
>>>>> Will also close the other voting thread with your vote.
>>>>>
>>>>> Guozhang
>>>>>
>>>>> On Thu, Sep 5, 2019 at 11:27 AM Matthias J. Sax <
>> matthias@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> I share Bruno's concern about future releases, however, I would
>> make
>>>>>> slightly different proposal.
>>>>>>
>>>>>> Instead of using "latest" we can just make the config optional and
>> if
>>>>>> not set, we use the new metrics code? This way we don't need to
>> add a
>>>>>> new version number each time we do a new release (note, that it
>> would
>>>> be
>>>>>> weird to keep default value "2.4" in future releases).
>>>>>>
>>>>>> For enabling backward compatibility: I don't have a strong opinion
>> if
>>>> we
>>>>>> should have a single value "0.10.0-2.3" or list each version
>>>> individually.
>>>>>>
>>>>>> In KIP-268 (fixing metadata upgrade) we decided to list each
>> version
>>>>>> individually as it seems simpler for users. Also, we wanted to hide
>>>>>> which release uses which metadata version (v0 in 0.10.0, and v1 in
>>>>>> 0.10.1 to 1.1). We could have collapsed `0.10.1` to `1.1` into a
>> single
>>>>>> value though but it seemed not to give best user experience.
>>>>>>
>>>>>> I think this KIP is a little different though and both options
>> seems to
>>>>>> be valid. However, I would like to emphasize that we should
>> optimize
>>>> for
>>>>>> user experience (and not if it's harder/easier to test etc---in
>> doubt,
>>>>>> we should always take on the burden if is helps to lift the burden
>> from
>>>>>> users).
>>>>>>
>>>>>> Overall, I am +1
>>>>>>
>>>>>> Some nits:
>>>>>>
>>>>>> (1) I think the motivation section for updating `StreamsMetrics`
>>>>>> interface does not make it clear why we need the change. What is
>> the
>>>>>> issue with the current interface and how do the new method address
>> the
>>>>>> issue
>>>>>>
>>>>>> (2) The metric name `put | put-if-absent .. | get-latency (avg |
>> max)`
>>>>>> is hard to read because is indicate that there is a `get-latency`
>>>> method
>>>>>> call on stores -- can we update it to
>>>>>>
>>>>>> `(put | put-if-absent .. | get)-latency (avg | max)`
>>>>>>
>>>>>> (3) typo: `When users override it to "2.2" or below,` this should
>> be
>>>>>> "2.3" -- or maybe even different if Bruno's concern gets addressed.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 9/4/19 12:26 PM, Bruno Cadonna wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am sorry to restart the discussion here, but I came across a
>> small
>>>>>>> issue in the KIP.
>>>>>>>
>>>>>>> I started to implement KIP-444 and I am bit concerned about the
>>>> values
>>>>>>> for the the config `built.in.metrics.version`. In the KIP the
>>>> possible
>>>>>>> values are specified as all Kafka Streams versions. I think that
>> this
>>>>>>> set of values is really hard to maintain in the code and it also
>>>> blows
>>>>>>> up the testing burden unnecessarily because all versions need to
>> be
>>>>>>> tested. My proposal (backed by John) is to use the following
>> values:
>>>>>>> - `latest` for the latest version of the metrics
>>>>>>> - `0.10.0-2.3` for the version before `latest`
>>>>>>> If in future, let's say in version 4.1, we need again to change
>> the
>>>>>>> metrics, we would add `2.4-4.0` to the values of the config. With
>>>>>>> major versions, we could also get rid of some values.
>>>>>>>
>>>>>>> WDYT?
>>>>>>>
>>>>>>> You can also have a look at the PR
>>>>>>> https://github.com/apache/kafka/pull/7279 to see this in code.
>>>>>>>
>>>>>>> Best,
>>>>>>> Bruno
>>>>>>>
>>>>>>> On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <
>> wangguoz@gmail.com>
>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hello Bruno,
>>>>>>>>
>>>>>>>> I've updated the wiki page again per your comments, here's a
>> brief
>>>>>> summary:
>>>>>>>>
>>>>>>>> 1. added the list of removed metrics.
>>>>>>>> 2. added a task-level INFO metric "dropped-records" that covers
>> all
>>>>>>>> scenarios and merges in the existing "late-records-drop",
>>>>>>>> "skipped-records", and "expired-window-records-drop".
>>>>>>>> 3. renamed the util functions of StreamsMetrics as
>>>> `addLatencyRateTotal`
>>>>>>>> and `addRateTotal` sensors.
>>>>>>>>
>>>>>>>>
>>>>>>>> Since I feel it has incorporated all of your comments I'm going
>> to
>>>> start
>>>>>>>> the vote thread for this KIP now.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <
>> wangguoz@gmail.com>
>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Bruno,
>>>>>>>>>
>>>>>>>>> No it was not intentional, and we can definitely add the total
>>>> amount
>>>>>>>>> sensor as well -- they are just util functions to save users
>> some
>>>>>> lines of
>>>>>>>>> code anyways, and should be straightforward.
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <
>> bruno@confluent.io>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Guozhang,
>>>>>>>>>>
>>>>>>>>>> I totally missed the total invocation count metric in the
>> javadoc.
>>>>>>>>>> Which brings me to a follow-up question. Should the names of
>> the
>>>>>>>>>> methods reflect the included total invocation count? We have
>> to
>>>> rename
>>>>>>>>>> them anyways. One option would be to simply add `Total` to the
>>>> method
>>>>>>>>>> names, i.e., `addLatencyAndRateAndTotalSensor` and
>>>>>>>>>> `addRateAndTotalSensor` (alternatively without the `And`s).
>> Since
>>>>>>>>>> those sensors record exclusively invocations, another option
>>>> would be
>>>>>>>>>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
>>>>>>>>>>
>>>>>>>>>> As far as I can see, we have sensors to record invocations but
>>>> none to
>>>>>>>>>> record amounts. Is that intentional? No need to add it to this
>>>> KIP, I
>>>>>>>>>> am just curious.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Bruno
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <
>> wangguoz@gmail.com
>>>>>
>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi Bruno,
>>>>>>>>>>>
>>>>>>>>>>> Just realized that for `addRateSensor` and
>>>> `addLatencyAndRateSensor`
>>>>>>>>>> we've
>>>>>>>>>>> actually added the total invocation metric already.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <
>>>> wangguoz@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Bruno,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <
>>>> bruno@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Guozhang,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I left my comments inline.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <
>>>> wangguoz@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello Bruno,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the feedbacks, replied inline.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <
>>>> bruno@confluent.io>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Guozhang,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thank you for the KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1) As far as I understand, the StreamsMetrics interface
>> is
>>>> there
>>>>>>>>>> for
>>>>>>>>>>>>>>> user-defined processors. Would it make sense to also add
>> a
>>>>>>>>>> method to
>>>>>>>>>>>>>>> the interface to specify a sensor that records skipped
>>>> records?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Not sure I follow.. if users want to add a specific
>> skipped
>>>>>>>>>> records
>>>>>>>>>>>>>> sensor, she can still do that as a "throughput" sensor
>> via "
>>>>>>>>>>>>>> addThroughputSensor" and then "record" right?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As an after-thought, maybe it's better to rename
>> `throughput`
>>>> to
>>>>>>>>>> `rate`
>>>>>>>>>>>>> in
>>>>>>>>>>>>>> the public APIs since it is really meant for the latter
>>>> semantics.
>>>>>>>>>> I did
>>>>>>>>>>>>>> not change it just to make less API changes / deprecate
>> fewer
>>>>>>>>>> functions.
>>>>>>>>>>>>>> But if we feel it is important we can change it as well.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I see now that a user can record the rate of skipped
>> records.
>>>>>>>>>> However,
>>>>>>>>>>>>> I was referring to the total number of skipped records.
>> Maybe
>>>> my
>>>>>>>>>>>>> question should be more general: should we allow the user
>> to
>>>> also
>>>>>>>>>>>>> specify sensors for totals or combinations of rate and
>> totals?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Sounds good to me, I will add it to the wiki page as well
>> for
>>>>>>>>>>>> StreamsMetrics.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> Regarding the naming, I like `rate` more than `throughput`,
>>>> but I
>>>>>>>>>>>>> would not fight for it.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2) What are the semantics of active-task-process and
>>>>>>>>>>>>> standby-task-process
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ah good catch, I think I made it in the wrong column.
>> Just
>>>> some
>>>>>>>>>>>>>> explanations here: Within a thread's looped iterations, it
>>>> will
>>>>>>>>>> first
>>>>>>>>>>>>> try
>>>>>>>>>>>>>> to process some records from the active tasks, and then
>> see if
>>>>>>>>>> there are
>>>>>>>>>>>>>> any standby-tasks that can be processed as well (i.e. just
>>>> reading
>>>>>>>>>> from
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> restore consumer and apply to the local stores). The ratio
>>>> metrics
>>>>>>>>>> are
>>>>>>>>>>>>> for
>>>>>>>>>>>>>> indicating 1) what tasks (active or standby) does this
>> thread
>>>> own
>>>>>>>>>> so
>>>>>>>>>>>>> far,
>>>>>>>>>>>>>> and 2) how much time in percentage does it spend on each
>> of
>>>> them.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> But this metric should really be a task-level one that
>>>> includes
>>>>>>>>>> both the
>>>>>>>>>>>>>> thread-id and task-id, and upon task migrations they will
>> be
>>>>>>>>>> dynamically
>>>>>>>>>>>>>> deleted / (re)-created. For each task-id it may be owned
>> by
>>>>>>>>>> multiple
>>>>>>>>>>>>>> threads as one active and others standby, and hence the
>>>> separation
>>>>>>>>>> of
>>>>>>>>>>>>>> active / standby seems still necessary.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Makes sense.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 3) How do dropped-late-records and
>> expired-window-record-drop
>>>>>>>>>> relate
>>>>>>>>>>>>>>> to each other? I guess the former is for records that
>> fall
>>>>>>>>>> outside the
>>>>>>>>>>>>>>> grace period and the latter is for records that are
>> processed
>>>>>>>>>> after
>>>>>>>>>>>>>>> the retention period of the window. Is this correct?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, that's correct. The names are indeed a bit confusing
>>>> since
>>>>>>>>>> they
>>>>>>>>>>>>> are
>>>>>>>>>>>>>> added at different releases historically..
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> More precisely, the `grace period` is a notion of the
>> operator
>>>>>>>>>> (hence
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> metric is node-level, though it would only be used for DSL
>>>>>>>>>> operators)
>>>>>>>>>>>>> while
>>>>>>>>>>>>>> the `retention` is a notion of the store (hence the
>> metric is
>>>>>>>>>>>>> store-level).
>>>>>>>>>>>>>> Usually grace period will be smaller than store retention
>>>> though.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Processor node is aware of `grace period` and when
>> received a
>>>>>>>>>> record
>>>>>>>>>>>>> that
>>>>>>>>>>>>>> is older than grace deadline, it will be dropped
>> immediately;
>>>>>>>>>> otherwise
>>>>>>>>>>>>> it
>>>>>>>>>>>>>> will still be processed a maybe a new update is "put"
>> into the
>>>>>>>>>> store.
>>>>>>>>>>>>> The
>>>>>>>>>>>>>> store is aware of its `retention period` and then upon a
>> "put"
>>>>>>>>>> call if
>>>>>>>>>>>>> it
>>>>>>>>>>>>>> realized it is older than the retention deadline, that put
>>>> call
>>>>>>>>>> would be
>>>>>>>>>>>>>> ignored and metric is recorded.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We have to separate them here since the window store can
>> be
>>>> used
>>>>>>>>>> in both
>>>>>>>>>>>>>> DSL and PAPI, and for the former case it would likely to
>> be
>>>>>> already
>>>>>>>>>>>>> ignored
>>>>>>>>>>>>>> at the processor node level due to the grace period which
>> is
>>>>>>>>>> usually
>>>>>>>>>>>>>> smaller than retention; but for PAPI there's no grace
>> period
>>>> and
>>>>>>>>>> hence
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> processor would likely still process and call "put" on the
>>>> store.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Alright! Got it!
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 4) Is there an actual difference between skipped and
>> dropped
>>>>>>>>>> records?
>>>>>>>>>>>>>>> If not, shall we unify the terminology?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> There is. Dropped records are only due to lateness; where
>> as
>>>>>>>>>> skipped
>>>>>>>>>>>>>> records can be due to serde errors (and user's error
>> handling
>>>>>>>>>> indicate
>>>>>>>>>>>>>> "skip and continue"), timestamp errors, etc.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I've considered maybe a better (more extensible) way
>> would be
>>>>>>>>>> defining a
>>>>>>>>>>>>>> single metric name, say skipped-records, but use different
>>>> tags to
>>>>>>>>>>>>> indicate
>>>>>>>>>>>>>> if its skipping reason (errors, windowing semantics,
>> etc). But
>>>>>>>>>> there's
>>>>>>>>>>>>>> still a tricky difference: for serde caused skipping for
>>>> example,
>>>>>>>>>> they
>>>>>>>>>>>>> will
>>>>>>>>>>>>>> be skipped at the very beginning and there's no effects
>> taken
>>>> at
>>>>>>>>>> all.
>>>>>>>>>>>>> For
>>>>>>>>>>>>>> some others e.g. null-key / value at the reduce operator,
>> it
>>>> is
>>>>>>>>>> only
>>>>>>>>>>>>>> skipped at the middle of the processing, i.e. some
>> effects may
>>>>>> have
>>>>>>>>>>>>> already
>>>>>>>>>>>>>> been taken in up-stream sub-topologies. And that's why for
>>>>>>>>>>>>> skipped-records
>>>>>>>>>>>>>> I've defined it on both task-level and node-level and the
>>>>>>>>>> aggregate of
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> latter may still be smaller than the former, whereas for
>>>>>>>>>>>>> dropped-records it
>>>>>>>>>>>>>> is only for node-level.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So how about an even more significant change then: we
>> enlarge
>>>> the
>>>>>>>>>>>>>> `dropped-late-records` to `dropped-records` which is
>>>> node-level
>>>>>>>>>> only,
>>>>>>>>>>>>> but
>>>>>>>>>>>>>> includes reasons form lateness to semantics (like
>> null-key) as
>>>>>>>>>> well; and
>>>>>>>>>>>>>> then we have a task-level-only `skipped-records` which
>> only
>>>> record
>>>>>>>>>> those
>>>>>>>>>>>>>> dropped at the very beginning and did not make it at all
>> to
>>>> the
>>>>>>>>>>>>> processing
>>>>>>>>>>>>>> topology. I feel this is a clearer distinguishment but
>> also a
>>>>>>>>>> bigger
>>>>>>>>>>>>> change
>>>>>>>>>>>>>> to users.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I like the way you dropped-records and skipped-records are
>> now
>>>>>>>>>>>>> defined. My follow-up question is whether we should give
>> names
>>>> to
>>>>>>>>>>>>> those metrics that better describe their semantics, like:
>>>>>>>>>>>>>
>>>>>>>>>>>>> dropped-records-at-source and dropped-records-at-processor
>>>>>>>>>>>>>
>>>>>>>>>>>>> or
>>>>>>>>>>>>>
>>>>>>>>>>>>> records-dropped-at-source and records-dropped-at-processor
>>>>>>>>>>>>>
>>>>>>>>>>>>> or
>>>>>>>>>>>>>
>>>>>>>>>>>>> source-dropped-records and processor-dropped-records
>>>>>>>>>>>>>
>>>>>>>>>>>>> or alternatively with skipped. However, I would use the
>> same
>>>> term
>>>>>> as
>>>>>>>>>>>>> in expired-window-record-drop
>>>>>>>>>>>>>
>>>>>>>>>>>>> Maybe, we should also consider to rename
>>>> expired-window-record-drop
>>>>>>>>>> to
>>>>>>>>>>>>> expired-window-record-dropped to be consistent.
>>>>>>>>>>>>>
>>>>>>>>>>>>> WDYT?
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was not considering "expired-window-record-drop" before
>>>> since it
>>>>>>>>>> is a
>>>>>>>>>>>> store-level metric, and I was only considering task-level
>>>>>>>>>> (skipped-records)
>>>>>>>>>>>> and processor-node-level (dropped-records) metrics, and I'm
>>>> using
>>>>>>>>>> different
>>>>>>>>>>>> terms deliberately to hint users that they are different
>> leveled
>>>>>>>>>> metrics.
>>>>>>>>>>>>
>>>>>>>>>>>> I still feel that using `skip` for task-level metrics
>> indicating
>>>>>> that
>>>>>>>>>> this
>>>>>>>>>>>> record was not processed at all, and using `drop` for
>>>>>> processor-level
>>>>>>>>>>>> metrics that this record is only dropped at this stage of
>> the
>>>>>>>>>> topology is a
>>>>>>>>>>>> better one; but I'm also okay with some finer grained
>> metrics so
>>>>>> that
>>>>>>>>>> we
>>>>>>>>>>>> can align the processor-level with store-level (they are on
>> the
>>>> same
>>>>>>>>>>>> granularity any ways), like:
>>>>>>>>>>>>
>>>>>>>>>>>> `dropped-records-null-field`: at processor nodes
>>>>>>>>>>>>
>>>>>>>>>>>> `dropped-records-too-late`: at processor nodes
>>>>>>>>>>>>
>>>>>>>>>>>> `dropped-records-expired-window`: at window-stores
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 5) What happens with removed metrics when the user sets
>> the
>>>>>>>>>> version of
>>>>>>>>>>>>>>> "built.in.metrics.version" to 2.2-
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think for those redundant ones like ""forward-rate" and
>>>>>>>>>>>>> "destroy-rate"
>>>>>>>>>>>>>> we can still remove them with 2.2- as well; for other ones
>>>> that
>>>>>> are
>>>>>>>>>>>>> removed
>>>>>>>>>>>>>> / replaced like thread-level skipped-records we should
>> still
>>>>>>>>>> maintain
>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Could you add this comment about removal of redundant
>> metrics
>>>> to
>>>>>> the
>>>>>>>>>>>>> KIP such that is documented somewhere?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, for sure.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I've also decided to remove the rebalance-related metrics
>> from
>>>> the
>>>>>>>>>>>> instance-level and move it to consumer itself as part of
>>>> KIP-429.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>
> 


Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by John Roesler <jo...@confluent.io>.
Thanks, all.

FWIW, the most recent formulation from Guozhang + Bruno's addendum would
have my support.

Thanks,
-John

On Thu, Sep 5, 2019 at 4:05 PM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Guozhang,
>
> Your summary corresponds to my proposal.
>
> A new value would only be added if in future we change the metrics in
> a backward-incompatible way, i.e., 2.4-<the version before the new
> breaking change>. "latest" will always stay the default.
>
> Best,
> Bruno
>
> On Thu, Sep 5, 2019 at 10:57 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > Hello Bruno,
> >
> > I think your concern makes sense, let's adopt this suggestion in KIP-444
> > instead. Just to clarify:
> >
> > 1. The default value would be "latest".
> > 2. The only other valid value is "0.10.0-2.3".
> >
> > And moving forward this config may stay without any new values.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Sep 5, 2019 at 12:16 PM Bruno Cadonna <br...@confluent.io>
> wrote:
> >
> > > Hi Guozhang,
> > >
> > > I think user experience and code maintenance are tightly related. The
> > > harder to maintain the code the worse the user experience will get.
> > >
> > > Making the config optional does not solve the issue. Wouldn't users be
> > > puzzled when we release 2.5 and they cannot set
> > > built.in.metrics.version to 2.4 to be sure to get the same metrics for
> > > that version? It seems with that solution we would just move
> > > maintenance to the next release.
> > >
> > > I cannot see how the values `latest` and `0.10.0-2.3` would lead to a
> > > bad user experience.
> > >
> > > Regarding testing, at least on integration test level, we absolutely
> > > need to test all versions. It is too easy to make a mistake with so
> > > many versions. Remember that on integration test level we need to
> > > start an embedded Kafka for each single test.
> > >
> > > Best,
> > > Bruno
> > >
> > > On Thu, Sep 5, 2019 at 8:46 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> > > >
> > > > Hi Bruno,
> > > >
> > > > Thanks for raising this point. I think the main motivation behind
> this
> > > > proposal is, like Matthias said, to ease the understanding burden
> from
> > > > users to our own shoulders. Testing wise, I think we do not
> necessarily
> > > > need to explode the testing matrix but just test the last version
> before
> > > > each metrics refactoring (again, hopefully it is the only time) and
> > > hence I
> > > > think it worth benefiting user's experience. WDYT?
> > > >
> > > > Hi Matthias,
> > > >
> > > > Thanks for your feedback, I will update the wiki page accordingly.
> > > >
> > > > Will also close the other voting thread with your vote.
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Sep 5, 2019 at 11:27 AM Matthias J. Sax <
> matthias@confluent.io>
> > > > wrote:
> > > >
> > > > > I share Bruno's concern about future releases, however, I would
> make
> > > > > slightly different proposal.
> > > > >
> > > > > Instead of using "latest" we can just make the config optional and
> if
> > > > > not set, we use the new metrics code? This way we don't need to
> add a
> > > > > new version number each time we do a new release (note, that it
> would
> > > be
> > > > > weird to keep default value "2.4" in future releases).
> > > > >
> > > > > For enabling backward compatibility: I don't have a strong opinion
> if
> > > we
> > > > > should have a single value "0.10.0-2.3" or list each version
> > > individually.
> > > > >
> > > > > In KIP-268 (fixing metadata upgrade) we decided to list each
> version
> > > > > individually as it seems simpler for users. Also, we wanted to hide
> > > > > which release uses which metadata version (v0 in 0.10.0, and v1 in
> > > > > 0.10.1 to 1.1). We could have collapsed `0.10.1` to `1.1` into a
> single
> > > > > value though but it seemed not to give best user experience.
> > > > >
> > > > > I think this KIP is a little different though and both options
> seems to
> > > > > be valid. However, I would like to emphasize that we should
> optimize
> > > for
> > > > > user experience (and not if it's harder/easier to test etc---in
> doubt,
> > > > > we should always take on the burden if is helps to lift the burden
> from
> > > > > users).
> > > > >
> > > > > Overall, I am +1
> > > > >
> > > > > Some nits:
> > > > >
> > > > > (1) I think the motivation section for updating `StreamsMetrics`
> > > > > interface does not make it clear why we need the change. What is
> the
> > > > > issue with the current interface and how do the new method address
> the
> > > > > issue
> > > > >
> > > > > (2) The metric name `put | put-if-absent .. | get-latency (avg |
> max)`
> > > > > is hard to read because is indicate that there is a `get-latency`
> > > method
> > > > > call on stores -- can we update it to
> > > > >
> > > > > `(put | put-if-absent .. | get)-latency (avg | max)`
> > > > >
> > > > > (3) typo: `When users override it to "2.2" or below,` this should
> be
> > > > > "2.3" -- or maybe even different if Bruno's concern gets addressed.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On 9/4/19 12:26 PM, Bruno Cadonna wrote:
> > > > > > Hi,
> > > > > >
> > > > > > I am sorry to restart the discussion here, but I came across a
> small
> > > > > > issue in the KIP.
> > > > > >
> > > > > > I started to implement KIP-444 and I am bit concerned about the
> > > values
> > > > > > for the the config `built.in.metrics.version`. In the KIP the
> > > possible
> > > > > > values are specified as all Kafka Streams versions. I think that
> this
> > > > > > set of values is really hard to maintain in the code and it also
> > > blows
> > > > > > up the testing burden unnecessarily because all versions need to
> be
> > > > > > tested. My proposal (backed by John) is to use the following
> values:
> > > > > > - `latest` for the latest version of the metrics
> > > > > > - `0.10.0-2.3` for the version before `latest`
> > > > > > If in future, let's say in version 4.1, we need again to change
> the
> > > > > > metrics, we would add `2.4-4.0` to the values of the config. With
> > > > > > major versions, we could also get rid of some values.
> > > > > >
> > > > > > WDYT?
> > > > > >
> > > > > > You can also have a look at the PR
> > > > > > https://github.com/apache/kafka/pull/7279 to see this in code.
> > > > > >
> > > > > > Best,
> > > > > > Bruno
> > > > > >
> > > > > > On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >> Hello Bruno,
> > > > > >>
> > > > > >> I've updated the wiki page again per your comments, here's a
> brief
> > > > > summary:
> > > > > >>
> > > > > >> 1. added the list of removed metrics.
> > > > > >> 2. added a task-level INFO metric "dropped-records" that covers
> all
> > > > > >> scenarios and merges in the existing "late-records-drop",
> > > > > >> "skipped-records", and "expired-window-records-drop".
> > > > > >> 3. renamed the util functions of StreamsMetrics as
> > > `addLatencyRateTotal`
> > > > > >> and `addRateTotal` sensors.
> > > > > >>
> > > > > >>
> > > > > >> Since I feel it has incorporated all of your comments I'm going
> to
> > > start
> > > > > >> the vote thread for this KIP now.
> > > > > >>
> > > > > >>
> > > > > >> Guozhang
> > > > > >>
> > > > > >>
> > > > > >> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >>> Hi Bruno,
> > > > > >>>
> > > > > >>> No it was not intentional, and we can definitely add the total
> > > amount
> > > > > >>> sensor as well -- they are just util functions to save users
> some
> > > > > lines of
> > > > > >>> code anyways, and should be straightforward.
> > > > > >>>
> > > > > >>> Guozhang
> > > > > >>>
> > > > > >>>
> > > > > >>> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <
> bruno@confluent.io>
> > > > > wrote:
> > > > > >>>
> > > > > >>>> Hi Guozhang,
> > > > > >>>>
> > > > > >>>> I totally missed the total invocation count metric in the
> javadoc.
> > > > > >>>> Which brings me to a follow-up question. Should the names of
> the
> > > > > >>>> methods reflect the included total invocation count? We have
> to
> > > rename
> > > > > >>>> them anyways. One option would be to simply add `Total` to the
> > > method
> > > > > >>>> names, i.e., `addLatencyAndRateAndTotalSensor` and
> > > > > >>>> `addRateAndTotalSensor` (alternatively without the `And`s).
> Since
> > > > > >>>> those sensors record exclusively invocations, another option
> > > would be
> > > > > >>>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
> > > > > >>>>
> > > > > >>>> As far as I can see, we have sensors to record invocations but
> > > none to
> > > > > >>>> record amounts. Is that intentional? No need to add it to this
> > > KIP, I
> > > > > >>>> am just curious.
> > > > > >>>>
> > > > > >>>> Best,
> > > > > >>>> Bruno
> > > > > >>>>
> > > > > >>>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <
> wangguoz@gmail.com
> > > >
> > > > > wrote:
> > > > > >>>>>
> > > > > >>>>> Hi Bruno,
> > > > > >>>>>
> > > > > >>>>> Just realized that for `addRateSensor` and
> > > `addLatencyAndRateSensor`
> > > > > >>>> we've
> > > > > >>>>> actually added the total invocation metric already.
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> Guozhang
> > > > > >>>>>
> > > > > >>>>> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <
> > > wangguoz@gmail.com>
> > > > > >>>> wrote:
> > > > > >>>>>
> > > > > >>>>>> Hi Bruno,
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <
> > > bruno@confluent.io>
> > > > > >>>> wrote:
> > > > > >>>>>>
> > > > > >>>>>>> Hi Guozhang,
> > > > > >>>>>>>
> > > > > >>>>>>> I left my comments inline.
> > > > > >>>>>>>
> > > > > >>>>>>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <
> > > wangguoz@gmail.com>
> > > > > >>>> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>> Hello Bruno,
> > > > > >>>>>>>>
> > > > > >>>>>>>> Thanks for the feedbacks, replied inline.
> > > > > >>>>>>>>
> > > > > >>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <
> > > bruno@confluent.io>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> Hi Guozhang,
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Thank you for the KIP.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> 1) As far as I understand, the StreamsMetrics interface
> is
> > > there
> > > > > >>>> for
> > > > > >>>>>>>>> user-defined processors. Would it make sense to also add
> a
> > > > > >>>> method to
> > > > > >>>>>>>>> the interface to specify a sensor that records skipped
> > > records?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Not sure I follow.. if users want to add a specific
> skipped
> > > > > >>>> records
> > > > > >>>>>>>> sensor, she can still do that as a "throughput" sensor
> via "
> > > > > >>>>>>>> addThroughputSensor" and then "record" right?
> > > > > >>>>>>>>
> > > > > >>>>>>>> As an after-thought, maybe it's better to rename
> `throughput`
> > > to
> > > > > >>>> `rate`
> > > > > >>>>>>> in
> > > > > >>>>>>>> the public APIs since it is really meant for the latter
> > > semantics.
> > > > > >>>> I did
> > > > > >>>>>>>> not change it just to make less API changes / deprecate
> fewer
> > > > > >>>> functions.
> > > > > >>>>>>>> But if we feel it is important we can change it as well.
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> I see now that a user can record the rate of skipped
> records.
> > > > > >>>> However,
> > > > > >>>>>>> I was referring to the total number of skipped records.
> Maybe
> > > my
> > > > > >>>>>>> question should be more general: should we allow the user
> to
> > > also
> > > > > >>>>>>> specify sensors for totals or combinations of rate and
> totals?
> > > > > >>>>>>>
> > > > > >>>>>>> Sounds good to me, I will add it to the wiki page as well
> for
> > > > > >>>>>> StreamsMetrics.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>> Regarding the naming, I like `rate` more than `throughput`,
> > > but I
> > > > > >>>>>>> would not fight for it.
> > > > > >>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>> 2) What are the semantics of active-task-process and
> > > > > >>>>>>> standby-task-process
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Ah good catch, I think I made it in the wrong column.
> Just
> > > some
> > > > > >>>>>>>> explanations here: Within a thread's looped iterations, it
> > > will
> > > > > >>>> first
> > > > > >>>>>>> try
> > > > > >>>>>>>> to process some records from the active tasks, and then
> see if
> > > > > >>>> there are
> > > > > >>>>>>>> any standby-tasks that can be processed as well (i.e. just
> > > reading
> > > > > >>>> from
> > > > > >>>>>>> the
> > > > > >>>>>>>> restore consumer and apply to the local stores). The ratio
> > > metrics
> > > > > >>>> are
> > > > > >>>>>>> for
> > > > > >>>>>>>> indicating 1) what tasks (active or standby) does this
> thread
> > > own
> > > > > >>>> so
> > > > > >>>>>>> far,
> > > > > >>>>>>>> and 2) how much time in percentage does it spend on each
> of
> > > them.
> > > > > >>>>>>>>
> > > > > >>>>>>>> But this metric should really be a task-level one that
> > > includes
> > > > > >>>> both the
> > > > > >>>>>>>> thread-id and task-id, and upon task migrations they will
> be
> > > > > >>>> dynamically
> > > > > >>>>>>>> deleted / (re)-created. For each task-id it may be owned
> by
> > > > > >>>> multiple
> > > > > >>>>>>>> threads as one active and others standby, and hence the
> > > separation
> > > > > >>>> of
> > > > > >>>>>>>> active / standby seems still necessary.
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> Makes sense.
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>> 3) How do dropped-late-records and
> expired-window-record-drop
> > > > > >>>> relate
> > > > > >>>>>>>>> to each other? I guess the former is for records that
> fall
> > > > > >>>> outside the
> > > > > >>>>>>>>> grace period and the latter is for records that are
> processed
> > > > > >>>> after
> > > > > >>>>>>>>> the retention period of the window. Is this correct?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Yes, that's correct. The names are indeed a bit confusing
> > > since
> > > > > >>>> they
> > > > > >>>>>>> are
> > > > > >>>>>>>> added at different releases historically..
> > > > > >>>>>>>>
> > > > > >>>>>>>> More precisely, the `grace period` is a notion of the
> operator
> > > > > >>>> (hence
> > > > > >>>>>>> the
> > > > > >>>>>>>> metric is node-level, though it would only be used for DSL
> > > > > >>>> operators)
> > > > > >>>>>>> while
> > > > > >>>>>>>> the `retention` is a notion of the store (hence the
> metric is
> > > > > >>>>>>> store-level).
> > > > > >>>>>>>> Usually grace period will be smaller than store retention
> > > though.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Processor node is aware of `grace period` and when
> received a
> > > > > >>>> record
> > > > > >>>>>>> that
> > > > > >>>>>>>> is older than grace deadline, it will be dropped
> immediately;
> > > > > >>>> otherwise
> > > > > >>>>>>> it
> > > > > >>>>>>>> will still be processed a maybe a new update is "put"
> into the
> > > > > >>>> store.
> > > > > >>>>>>> The
> > > > > >>>>>>>> store is aware of its `retention period` and then upon a
> "put"
> > > > > >>>> call if
> > > > > >>>>>>> it
> > > > > >>>>>>>> realized it is older than the retention deadline, that put
> > > call
> > > > > >>>> would be
> > > > > >>>>>>>> ignored and metric is recorded.
> > > > > >>>>>>>>
> > > > > >>>>>>>> We have to separate them here since the window store can
> be
> > > used
> > > > > >>>> in both
> > > > > >>>>>>>> DSL and PAPI, and for the former case it would likely to
> be
> > > > > already
> > > > > >>>>>>> ignored
> > > > > >>>>>>>> at the processor node level due to the grace period which
> is
> > > > > >>>> usually
> > > > > >>>>>>>> smaller than retention; but for PAPI there's no grace
> period
> > > and
> > > > > >>>> hence
> > > > > >>>>>>> the
> > > > > >>>>>>>> processor would likely still process and call "put" on the
> > > store.
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> Alright! Got it!
> > > > > >>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>> 4) Is there an actual difference between skipped and
> dropped
> > > > > >>>> records?
> > > > > >>>>>>>>> If not, shall we unify the terminology?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>> There is. Dropped records are only due to lateness; where
> as
> > > > > >>>> skipped
> > > > > >>>>>>>> records can be due to serde errors (and user's error
> handling
> > > > > >>>> indicate
> > > > > >>>>>>>> "skip and continue"), timestamp errors, etc.
> > > > > >>>>>>>>
> > > > > >>>>>>>> I've considered maybe a better (more extensible) way
> would be
> > > > > >>>> defining a
> > > > > >>>>>>>> single metric name, say skipped-records, but use different
> > > tags to
> > > > > >>>>>>> indicate
> > > > > >>>>>>>> if its skipping reason (errors, windowing semantics,
> etc). But
> > > > > >>>> there's
> > > > > >>>>>>>> still a tricky difference: for serde caused skipping for
> > > example,
> > > > > >>>> they
> > > > > >>>>>>> will
> > > > > >>>>>>>> be skipped at the very beginning and there's no effects
> taken
> > > at
> > > > > >>>> all.
> > > > > >>>>>>> For
> > > > > >>>>>>>> some others e.g. null-key / value at the reduce operator,
> it
> > > is
> > > > > >>>> only
> > > > > >>>>>>>> skipped at the middle of the processing, i.e. some
> effects may
> > > > > have
> > > > > >>>>>>> already
> > > > > >>>>>>>> been taken in up-stream sub-topologies. And that's why for
> > > > > >>>>>>> skipped-records
> > > > > >>>>>>>> I've defined it on both task-level and node-level and the
> > > > > >>>> aggregate of
> > > > > >>>>>>> the
> > > > > >>>>>>>> latter may still be smaller than the former, whereas for
> > > > > >>>>>>> dropped-records it
> > > > > >>>>>>>> is only for node-level.
> > > > > >>>>>>>>
> > > > > >>>>>>>> So how about an even more significant change then: we
> enlarge
> > > the
> > > > > >>>>>>>> `dropped-late-records` to `dropped-records` which is
> > > node-level
> > > > > >>>> only,
> > > > > >>>>>>> but
> > > > > >>>>>>>> includes reasons form lateness to semantics (like
> null-key) as
> > > > > >>>> well; and
> > > > > >>>>>>>> then we have a task-level-only `skipped-records` which
> only
> > > record
> > > > > >>>> those
> > > > > >>>>>>>> dropped at the very beginning and did not make it at all
> to
> > > the
> > > > > >>>>>>> processing
> > > > > >>>>>>>> topology. I feel this is a clearer distinguishment but
> also a
> > > > > >>>> bigger
> > > > > >>>>>>> change
> > > > > >>>>>>>> to users.
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> I like the way you dropped-records and skipped-records are
> now
> > > > > >>>>>>> defined. My follow-up question is whether we should give
> names
> > > to
> > > > > >>>>>>> those metrics that better describe their semantics, like:
> > > > > >>>>>>>
> > > > > >>>>>>> dropped-records-at-source and dropped-records-at-processor
> > > > > >>>>>>>
> > > > > >>>>>>> or
> > > > > >>>>>>>
> > > > > >>>>>>> records-dropped-at-source and records-dropped-at-processor
> > > > > >>>>>>>
> > > > > >>>>>>> or
> > > > > >>>>>>>
> > > > > >>>>>>> source-dropped-records and processor-dropped-records
> > > > > >>>>>>>
> > > > > >>>>>>> or alternatively with skipped. However, I would use the
> same
> > > term
> > > > > as
> > > > > >>>>>>> in expired-window-record-drop
> > > > > >>>>>>>
> > > > > >>>>>>> Maybe, we should also consider to rename
> > > expired-window-record-drop
> > > > > >>>> to
> > > > > >>>>>>> expired-window-record-dropped to be consistent.
> > > > > >>>>>>>
> > > > > >>>>>>> WDYT?
> > > > > >>>>>>>
> > > > > >>>>>>> I was not considering "expired-window-record-drop" before
> > > since it
> > > > > >>>> is a
> > > > > >>>>>> store-level metric, and I was only considering task-level
> > > > > >>>> (skipped-records)
> > > > > >>>>>> and processor-node-level (dropped-records) metrics, and I'm
> > > using
> > > > > >>>> different
> > > > > >>>>>> terms deliberately to hint users that they are different
> leveled
> > > > > >>>> metrics.
> > > > > >>>>>>
> > > > > >>>>>> I still feel that using `skip` for task-level metrics
> indicating
> > > > > that
> > > > > >>>> this
> > > > > >>>>>> record was not processed at all, and using `drop` for
> > > > > processor-level
> > > > > >>>>>> metrics that this record is only dropped at this stage of
> the
> > > > > >>>> topology is a
> > > > > >>>>>> better one; but I'm also okay with some finer grained
> metrics so
> > > > > that
> > > > > >>>> we
> > > > > >>>>>> can align the processor-level with store-level (they are on
> the
> > > same
> > > > > >>>>>> granularity any ways), like:
> > > > > >>>>>>
> > > > > >>>>>> `dropped-records-null-field`: at processor nodes
> > > > > >>>>>>
> > > > > >>>>>> `dropped-records-too-late`: at processor nodes
> > > > > >>>>>>
> > > > > >>>>>> `dropped-records-expired-window`: at window-stores
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>> 5) What happens with removed metrics when the user sets
> the
> > > > > >>>> version of
> > > > > >>>>>>>>> "built.in.metrics.version" to 2.2-
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> I think for those redundant ones like ""forward-rate" and
> > > > > >>>>>>> "destroy-rate"
> > > > > >>>>>>>> we can still remove them with 2.2- as well; for other ones
> > > that
> > > > > are
> > > > > >>>>>>> removed
> > > > > >>>>>>>> / replaced like thread-level skipped-records we should
> still
> > > > > >>>> maintain
> > > > > >>>>>>> them.
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> Could you add this comment about removal of redundant
> metrics
> > > to
> > > > > the
> > > > > >>>>>>> KIP such that is documented somewhere?
> > > > > >>>>>>>
> > > > > >>>>>>> Yes, for sure.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> Best,
> > > > > >>>>>>> Bruno
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>> I've also decided to remove the rebalance-related metrics
> from
> > > the
> > > > > >>>>>> instance-level and move it to consumer itself as part of
> > > KIP-429.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> --
> > > > > >>>>>> -- Guozhang
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> --
> > > > > >>>>> -- Guozhang
> > > > > >>>>
> > > > > >>>
> > > > > >>>
> > > > > >>> --
> > > > > >>> -- Guozhang
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> -- Guozhang
> > > > >
> > > > >
> > > >
> > > > --
> > > > -- Guozhang
> > >
> >
> >
> > --
> > -- Guozhang
>

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Guozhang,

Your summary corresponds to my proposal.

A new value would only be added if in future we change the metrics in
a backward-incompatible way, i.e., 2.4-<the version before the new
breaking change>. "latest" will always stay the default.

Best,
Bruno

On Thu, Sep 5, 2019 at 10:57 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> Hello Bruno,
>
> I think your concern makes sense, let's adopt this suggestion in KIP-444
> instead. Just to clarify:
>
> 1. The default value would be "latest".
> 2. The only other valid value is "0.10.0-2.3".
>
> And moving forward this config may stay without any new values.
>
>
> Guozhang
>
>
> On Thu, Sep 5, 2019 at 12:16 PM Bruno Cadonna <br...@confluent.io> wrote:
>
> > Hi Guozhang,
> >
> > I think user experience and code maintenance are tightly related. The
> > harder to maintain the code the worse the user experience will get.
> >
> > Making the config optional does not solve the issue. Wouldn't users be
> > puzzled when we release 2.5 and they cannot set
> > built.in.metrics.version to 2.4 to be sure to get the same metrics for
> > that version? It seems with that solution we would just move
> > maintenance to the next release.
> >
> > I cannot see how the values `latest` and `0.10.0-2.3` would lead to a
> > bad user experience.
> >
> > Regarding testing, at least on integration test level, we absolutely
> > need to test all versions. It is too easy to make a mistake with so
> > many versions. Remember that on integration test level we need to
> > start an embedded Kafka for each single test.
> >
> > Best,
> > Bruno
> >
> > On Thu, Sep 5, 2019 at 8:46 PM Guozhang Wang <wa...@gmail.com> wrote:
> > >
> > > Hi Bruno,
> > >
> > > Thanks for raising this point. I think the main motivation behind this
> > > proposal is, like Matthias said, to ease the understanding burden from
> > > users to our own shoulders. Testing wise, I think we do not necessarily
> > > need to explode the testing matrix but just test the last version before
> > > each metrics refactoring (again, hopefully it is the only time) and
> > hence I
> > > think it worth benefiting user's experience. WDYT?
> > >
> > > Hi Matthias,
> > >
> > > Thanks for your feedback, I will update the wiki page accordingly.
> > >
> > > Will also close the other voting thread with your vote.
> > >
> > > Guozhang
> > >
> > > On Thu, Sep 5, 2019 at 11:27 AM Matthias J. Sax <ma...@confluent.io>
> > > wrote:
> > >
> > > > I share Bruno's concern about future releases, however, I would make
> > > > slightly different proposal.
> > > >
> > > > Instead of using "latest" we can just make the config optional and if
> > > > not set, we use the new metrics code? This way we don't need to add a
> > > > new version number each time we do a new release (note, that it would
> > be
> > > > weird to keep default value "2.4" in future releases).
> > > >
> > > > For enabling backward compatibility: I don't have a strong opinion if
> > we
> > > > should have a single value "0.10.0-2.3" or list each version
> > individually.
> > > >
> > > > In KIP-268 (fixing metadata upgrade) we decided to list each version
> > > > individually as it seems simpler for users. Also, we wanted to hide
> > > > which release uses which metadata version (v0 in 0.10.0, and v1 in
> > > > 0.10.1 to 1.1). We could have collapsed `0.10.1` to `1.1` into a single
> > > > value though but it seemed not to give best user experience.
> > > >
> > > > I think this KIP is a little different though and both options seems to
> > > > be valid. However, I would like to emphasize that we should optimize
> > for
> > > > user experience (and not if it's harder/easier to test etc---in doubt,
> > > > we should always take on the burden if is helps to lift the burden from
> > > > users).
> > > >
> > > > Overall, I am +1
> > > >
> > > > Some nits:
> > > >
> > > > (1) I think the motivation section for updating `StreamsMetrics`
> > > > interface does not make it clear why we need the change. What is the
> > > > issue with the current interface and how do the new method address the
> > > > issue
> > > >
> > > > (2) The metric name `put | put-if-absent .. | get-latency (avg | max)`
> > > > is hard to read because is indicate that there is a `get-latency`
> > method
> > > > call on stores -- can we update it to
> > > >
> > > > `(put | put-if-absent .. | get)-latency (avg | max)`
> > > >
> > > > (3) typo: `When users override it to "2.2" or below,` this should be
> > > > "2.3" -- or maybe even different if Bruno's concern gets addressed.
> > > >
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On 9/4/19 12:26 PM, Bruno Cadonna wrote:
> > > > > Hi,
> > > > >
> > > > > I am sorry to restart the discussion here, but I came across a small
> > > > > issue in the KIP.
> > > > >
> > > > > I started to implement KIP-444 and I am bit concerned about the
> > values
> > > > > for the the config `built.in.metrics.version`. In the KIP the
> > possible
> > > > > values are specified as all Kafka Streams versions. I think that this
> > > > > set of values is really hard to maintain in the code and it also
> > blows
> > > > > up the testing burden unnecessarily because all versions need to be
> > > > > tested. My proposal (backed by John) is to use the following values:
> > > > > - `latest` for the latest version of the metrics
> > > > > - `0.10.0-2.3` for the version before `latest`
> > > > > If in future, let's say in version 4.1, we need again to change the
> > > > > metrics, we would add `2.4-4.0` to the values of the config. With
> > > > > major versions, we could also get rid of some values.
> > > > >
> > > > > WDYT?
> > > > >
> > > > > You can also have a look at the PR
> > > > > https://github.com/apache/kafka/pull/7279 to see this in code.
> > > > >
> > > > > Best,
> > > > > Bruno
> > > > >
> > > > > On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> Hello Bruno,
> > > > >>
> > > > >> I've updated the wiki page again per your comments, here's a brief
> > > > summary:
> > > > >>
> > > > >> 1. added the list of removed metrics.
> > > > >> 2. added a task-level INFO metric "dropped-records" that covers all
> > > > >> scenarios and merges in the existing "late-records-drop",
> > > > >> "skipped-records", and "expired-window-records-drop".
> > > > >> 3. renamed the util functions of StreamsMetrics as
> > `addLatencyRateTotal`
> > > > >> and `addRateTotal` sensors.
> > > > >>
> > > > >>
> > > > >> Since I feel it has incorporated all of your comments I'm going to
> > start
> > > > >> the vote thread for this KIP now.
> > > > >>
> > > > >>
> > > > >> Guozhang
> > > > >>
> > > > >>
> > > > >> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >>> Hi Bruno,
> > > > >>>
> > > > >>> No it was not intentional, and we can definitely add the total
> > amount
> > > > >>> sensor as well -- they are just util functions to save users some
> > > > lines of
> > > > >>> code anyways, and should be straightforward.
> > > > >>>
> > > > >>> Guozhang
> > > > >>>
> > > > >>>
> > > > >>> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io>
> > > > wrote:
> > > > >>>
> > > > >>>> Hi Guozhang,
> > > > >>>>
> > > > >>>> I totally missed the total invocation count metric in the javadoc.
> > > > >>>> Which brings me to a follow-up question. Should the names of the
> > > > >>>> methods reflect the included total invocation count? We have to
> > rename
> > > > >>>> them anyways. One option would be to simply add `Total` to the
> > method
> > > > >>>> names, i.e., `addLatencyAndRateAndTotalSensor` and
> > > > >>>> `addRateAndTotalSensor` (alternatively without the `And`s). Since
> > > > >>>> those sensors record exclusively invocations, another option
> > would be
> > > > >>>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
> > > > >>>>
> > > > >>>> As far as I can see, we have sensors to record invocations but
> > none to
> > > > >>>> record amounts. Is that intentional? No need to add it to this
> > KIP, I
> > > > >>>> am just curious.
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Bruno
> > > > >>>>
> > > > >>>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wangguoz@gmail.com
> > >
> > > > wrote:
> > > > >>>>>
> > > > >>>>> Hi Bruno,
> > > > >>>>>
> > > > >>>>> Just realized that for `addRateSensor` and
> > `addLatencyAndRateSensor`
> > > > >>>> we've
> > > > >>>>> actually added the total invocation metric already.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> Guozhang
> > > > >>>>>
> > > > >>>>> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <
> > wangguoz@gmail.com>
> > > > >>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hi Bruno,
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <
> > bruno@confluent.io>
> > > > >>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Hi Guozhang,
> > > > >>>>>>>
> > > > >>>>>>> I left my comments inline.
> > > > >>>>>>>
> > > > >>>>>>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <
> > wangguoz@gmail.com>
> > > > >>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>> Hello Bruno,
> > > > >>>>>>>>
> > > > >>>>>>>> Thanks for the feedbacks, replied inline.
> > > > >>>>>>>>
> > > > >>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <
> > bruno@confluent.io>
> > > > >>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> Hi Guozhang,
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thank you for the KIP.
> > > > >>>>>>>>>
> > > > >>>>>>>>> 1) As far as I understand, the StreamsMetrics interface is
> > there
> > > > >>>> for
> > > > >>>>>>>>> user-defined processors. Would it make sense to also add a
> > > > >>>> method to
> > > > >>>>>>>>> the interface to specify a sensor that records skipped
> > records?
> > > > >>>>>>>>>
> > > > >>>>>>>>> Not sure I follow.. if users want to add a specific skipped
> > > > >>>> records
> > > > >>>>>>>> sensor, she can still do that as a "throughput" sensor via "
> > > > >>>>>>>> addThroughputSensor" and then "record" right?
> > > > >>>>>>>>
> > > > >>>>>>>> As an after-thought, maybe it's better to rename `throughput`
> > to
> > > > >>>> `rate`
> > > > >>>>>>> in
> > > > >>>>>>>> the public APIs since it is really meant for the latter
> > semantics.
> > > > >>>> I did
> > > > >>>>>>>> not change it just to make less API changes / deprecate fewer
> > > > >>>> functions.
> > > > >>>>>>>> But if we feel it is important we can change it as well.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I see now that a user can record the rate of skipped records.
> > > > >>>> However,
> > > > >>>>>>> I was referring to the total number of skipped records. Maybe
> > my
> > > > >>>>>>> question should be more general: should we allow the user to
> > also
> > > > >>>>>>> specify sensors for totals or combinations of rate and totals?
> > > > >>>>>>>
> > > > >>>>>>> Sounds good to me, I will add it to the wiki page as well for
> > > > >>>>>> StreamsMetrics.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>> Regarding the naming, I like `rate` more than `throughput`,
> > but I
> > > > >>>>>>> would not fight for it.
> > > > >>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> 2) What are the semantics of active-task-process and
> > > > >>>>>>> standby-task-process
> > > > >>>>>>>>>
> > > > >>>>>>>>> Ah good catch, I think I made it in the wrong column. Just
> > some
> > > > >>>>>>>> explanations here: Within a thread's looped iterations, it
> > will
> > > > >>>> first
> > > > >>>>>>> try
> > > > >>>>>>>> to process some records from the active tasks, and then see if
> > > > >>>> there are
> > > > >>>>>>>> any standby-tasks that can be processed as well (i.e. just
> > reading
> > > > >>>> from
> > > > >>>>>>> the
> > > > >>>>>>>> restore consumer and apply to the local stores). The ratio
> > metrics
> > > > >>>> are
> > > > >>>>>>> for
> > > > >>>>>>>> indicating 1) what tasks (active or standby) does this thread
> > own
> > > > >>>> so
> > > > >>>>>>> far,
> > > > >>>>>>>> and 2) how much time in percentage does it spend on each of
> > them.
> > > > >>>>>>>>
> > > > >>>>>>>> But this metric should really be a task-level one that
> > includes
> > > > >>>> both the
> > > > >>>>>>>> thread-id and task-id, and upon task migrations they will be
> > > > >>>> dynamically
> > > > >>>>>>>> deleted / (re)-created. For each task-id it may be owned by
> > > > >>>> multiple
> > > > >>>>>>>> threads as one active and others standby, and hence the
> > separation
> > > > >>>> of
> > > > >>>>>>>> active / standby seems still necessary.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Makes sense.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> 3) How do dropped-late-records and expired-window-record-drop
> > > > >>>> relate
> > > > >>>>>>>>> to each other? I guess the former is for records that fall
> > > > >>>> outside the
> > > > >>>>>>>>> grace period and the latter is for records that are processed
> > > > >>>> after
> > > > >>>>>>>>> the retention period of the window. Is this correct?
> > > > >>>>>>>>>
> > > > >>>>>>>>> Yes, that's correct. The names are indeed a bit confusing
> > since
> > > > >>>> they
> > > > >>>>>>> are
> > > > >>>>>>>> added at different releases historically..
> > > > >>>>>>>>
> > > > >>>>>>>> More precisely, the `grace period` is a notion of the operator
> > > > >>>> (hence
> > > > >>>>>>> the
> > > > >>>>>>>> metric is node-level, though it would only be used for DSL
> > > > >>>> operators)
> > > > >>>>>>> while
> > > > >>>>>>>> the `retention` is a notion of the store (hence the metric is
> > > > >>>>>>> store-level).
> > > > >>>>>>>> Usually grace period will be smaller than store retention
> > though.
> > > > >>>>>>>>
> > > > >>>>>>>> Processor node is aware of `grace period` and when received a
> > > > >>>> record
> > > > >>>>>>> that
> > > > >>>>>>>> is older than grace deadline, it will be dropped immediately;
> > > > >>>> otherwise
> > > > >>>>>>> it
> > > > >>>>>>>> will still be processed a maybe a new update is "put" into the
> > > > >>>> store.
> > > > >>>>>>> The
> > > > >>>>>>>> store is aware of its `retention period` and then upon a "put"
> > > > >>>> call if
> > > > >>>>>>> it
> > > > >>>>>>>> realized it is older than the retention deadline, that put
> > call
> > > > >>>> would be
> > > > >>>>>>>> ignored and metric is recorded.
> > > > >>>>>>>>
> > > > >>>>>>>> We have to separate them here since the window store can be
> > used
> > > > >>>> in both
> > > > >>>>>>>> DSL and PAPI, and for the former case it would likely to be
> > > > already
> > > > >>>>>>> ignored
> > > > >>>>>>>> at the processor node level due to the grace period which is
> > > > >>>> usually
> > > > >>>>>>>> smaller than retention; but for PAPI there's no grace period
> > and
> > > > >>>> hence
> > > > >>>>>>> the
> > > > >>>>>>>> processor would likely still process and call "put" on the
> > store.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Alright! Got it!
> > > > >>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> 4) Is there an actual difference between skipped and dropped
> > > > >>>> records?
> > > > >>>>>>>>> If not, shall we unify the terminology?
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>> There is. Dropped records are only due to lateness; where as
> > > > >>>> skipped
> > > > >>>>>>>> records can be due to serde errors (and user's error handling
> > > > >>>> indicate
> > > > >>>>>>>> "skip and continue"), timestamp errors, etc.
> > > > >>>>>>>>
> > > > >>>>>>>> I've considered maybe a better (more extensible) way would be
> > > > >>>> defining a
> > > > >>>>>>>> single metric name, say skipped-records, but use different
> > tags to
> > > > >>>>>>> indicate
> > > > >>>>>>>> if its skipping reason (errors, windowing semantics, etc). But
> > > > >>>> there's
> > > > >>>>>>>> still a tricky difference: for serde caused skipping for
> > example,
> > > > >>>> they
> > > > >>>>>>> will
> > > > >>>>>>>> be skipped at the very beginning and there's no effects taken
> > at
> > > > >>>> all.
> > > > >>>>>>> For
> > > > >>>>>>>> some others e.g. null-key / value at the reduce operator, it
> > is
> > > > >>>> only
> > > > >>>>>>>> skipped at the middle of the processing, i.e. some effects may
> > > > have
> > > > >>>>>>> already
> > > > >>>>>>>> been taken in up-stream sub-topologies. And that's why for
> > > > >>>>>>> skipped-records
> > > > >>>>>>>> I've defined it on both task-level and node-level and the
> > > > >>>> aggregate of
> > > > >>>>>>> the
> > > > >>>>>>>> latter may still be smaller than the former, whereas for
> > > > >>>>>>> dropped-records it
> > > > >>>>>>>> is only for node-level.
> > > > >>>>>>>>
> > > > >>>>>>>> So how about an even more significant change then: we enlarge
> > the
> > > > >>>>>>>> `dropped-late-records` to `dropped-records` which is
> > node-level
> > > > >>>> only,
> > > > >>>>>>> but
> > > > >>>>>>>> includes reasons form lateness to semantics (like null-key) as
> > > > >>>> well; and
> > > > >>>>>>>> then we have a task-level-only `skipped-records` which only
> > record
> > > > >>>> those
> > > > >>>>>>>> dropped at the very beginning and did not make it at all to
> > the
> > > > >>>>>>> processing
> > > > >>>>>>>> topology. I feel this is a clearer distinguishment but also a
> > > > >>>> bigger
> > > > >>>>>>> change
> > > > >>>>>>>> to users.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I like the way you dropped-records and skipped-records are now
> > > > >>>>>>> defined. My follow-up question is whether we should give names
> > to
> > > > >>>>>>> those metrics that better describe their semantics, like:
> > > > >>>>>>>
> > > > >>>>>>> dropped-records-at-source and dropped-records-at-processor
> > > > >>>>>>>
> > > > >>>>>>> or
> > > > >>>>>>>
> > > > >>>>>>> records-dropped-at-source and records-dropped-at-processor
> > > > >>>>>>>
> > > > >>>>>>> or
> > > > >>>>>>>
> > > > >>>>>>> source-dropped-records and processor-dropped-records
> > > > >>>>>>>
> > > > >>>>>>> or alternatively with skipped. However, I would use the same
> > term
> > > > as
> > > > >>>>>>> in expired-window-record-drop
> > > > >>>>>>>
> > > > >>>>>>> Maybe, we should also consider to rename
> > expired-window-record-drop
> > > > >>>> to
> > > > >>>>>>> expired-window-record-dropped to be consistent.
> > > > >>>>>>>
> > > > >>>>>>> WDYT?
> > > > >>>>>>>
> > > > >>>>>>> I was not considering "expired-window-record-drop" before
> > since it
> > > > >>>> is a
> > > > >>>>>> store-level metric, and I was only considering task-level
> > > > >>>> (skipped-records)
> > > > >>>>>> and processor-node-level (dropped-records) metrics, and I'm
> > using
> > > > >>>> different
> > > > >>>>>> terms deliberately to hint users that they are different leveled
> > > > >>>> metrics.
> > > > >>>>>>
> > > > >>>>>> I still feel that using `skip` for task-level metrics indicating
> > > > that
> > > > >>>> this
> > > > >>>>>> record was not processed at all, and using `drop` for
> > > > processor-level
> > > > >>>>>> metrics that this record is only dropped at this stage of the
> > > > >>>> topology is a
> > > > >>>>>> better one; but I'm also okay with some finer grained metrics so
> > > > that
> > > > >>>> we
> > > > >>>>>> can align the processor-level with store-level (they are on the
> > same
> > > > >>>>>> granularity any ways), like:
> > > > >>>>>>
> > > > >>>>>> `dropped-records-null-field`: at processor nodes
> > > > >>>>>>
> > > > >>>>>> `dropped-records-too-late`: at processor nodes
> > > > >>>>>>
> > > > >>>>>> `dropped-records-expired-window`: at window-stores
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> 5) What happens with removed metrics when the user sets the
> > > > >>>> version of
> > > > >>>>>>>>> "built.in.metrics.version" to 2.2-
> > > > >>>>>>>>>
> > > > >>>>>>>>> I think for those redundant ones like ""forward-rate" and
> > > > >>>>>>> "destroy-rate"
> > > > >>>>>>>> we can still remove them with 2.2- as well; for other ones
> > that
> > > > are
> > > > >>>>>>> removed
> > > > >>>>>>>> / replaced like thread-level skipped-records we should still
> > > > >>>> maintain
> > > > >>>>>>> them.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Could you add this comment about removal of redundant metrics
> > to
> > > > the
> > > > >>>>>>> KIP such that is documented somewhere?
> > > > >>>>>>>
> > > > >>>>>>> Yes, for sure.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> Bruno
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>> I've also decided to remove the rebalance-related metrics from
> > the
> > > > >>>>>> instance-level and move it to consumer itself as part of
> > KIP-429.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> --
> > > > >>>>>> -- Guozhang
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> --
> > > > >>>>> -- Guozhang
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>> --
> > > > >>> -- Guozhang
> > > > >>>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -- Guozhang
> > > >
> > > >
> > >
> > > --
> > > -- Guozhang
> >
>
>
> --
> -- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Bruno,

I think your concern makes sense, let's adopt this suggestion in KIP-444
instead. Just to clarify:

1. The default value would be "latest".
2. The only other valid value is "0.10.0-2.3".

And moving forward this config may stay without any new values.


Guozhang


On Thu, Sep 5, 2019 at 12:16 PM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Guozhang,
>
> I think user experience and code maintenance are tightly related. The
> harder to maintain the code the worse the user experience will get.
>
> Making the config optional does not solve the issue. Wouldn't users be
> puzzled when we release 2.5 and they cannot set
> built.in.metrics.version to 2.4 to be sure to get the same metrics for
> that version? It seems with that solution we would just move
> maintenance to the next release.
>
> I cannot see how the values `latest` and `0.10.0-2.3` would lead to a
> bad user experience.
>
> Regarding testing, at least on integration test level, we absolutely
> need to test all versions. It is too easy to make a mistake with so
> many versions. Remember that on integration test level we need to
> start an embedded Kafka for each single test.
>
> Best,
> Bruno
>
> On Thu, Sep 5, 2019 at 8:46 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > Hi Bruno,
> >
> > Thanks for raising this point. I think the main motivation behind this
> > proposal is, like Matthias said, to ease the understanding burden from
> > users to our own shoulders. Testing wise, I think we do not necessarily
> > need to explode the testing matrix but just test the last version before
> > each metrics refactoring (again, hopefully it is the only time) and
> hence I
> > think it worth benefiting user's experience. WDYT?
> >
> > Hi Matthias,
> >
> > Thanks for your feedback, I will update the wiki page accordingly.
> >
> > Will also close the other voting thread with your vote.
> >
> > Guozhang
> >
> > On Thu, Sep 5, 2019 at 11:27 AM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > I share Bruno's concern about future releases, however, I would make
> > > slightly different proposal.
> > >
> > > Instead of using "latest" we can just make the config optional and if
> > > not set, we use the new metrics code? This way we don't need to add a
> > > new version number each time we do a new release (note, that it would
> be
> > > weird to keep default value "2.4" in future releases).
> > >
> > > For enabling backward compatibility: I don't have a strong opinion if
> we
> > > should have a single value "0.10.0-2.3" or list each version
> individually.
> > >
> > > In KIP-268 (fixing metadata upgrade) we decided to list each version
> > > individually as it seems simpler for users. Also, we wanted to hide
> > > which release uses which metadata version (v0 in 0.10.0, and v1 in
> > > 0.10.1 to 1.1). We could have collapsed `0.10.1` to `1.1` into a single
> > > value though but it seemed not to give best user experience.
> > >
> > > I think this KIP is a little different though and both options seems to
> > > be valid. However, I would like to emphasize that we should optimize
> for
> > > user experience (and not if it's harder/easier to test etc---in doubt,
> > > we should always take on the burden if is helps to lift the burden from
> > > users).
> > >
> > > Overall, I am +1
> > >
> > > Some nits:
> > >
> > > (1) I think the motivation section for updating `StreamsMetrics`
> > > interface does not make it clear why we need the change. What is the
> > > issue with the current interface and how do the new method address the
> > > issue
> > >
> > > (2) The metric name `put | put-if-absent .. | get-latency (avg | max)`
> > > is hard to read because is indicate that there is a `get-latency`
> method
> > > call on stores -- can we update it to
> > >
> > > `(put | put-if-absent .. | get)-latency (avg | max)`
> > >
> > > (3) typo: `When users override it to "2.2" or below,` this should be
> > > "2.3" -- or maybe even different if Bruno's concern gets addressed.
> > >
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > >
> > >
> > > On 9/4/19 12:26 PM, Bruno Cadonna wrote:
> > > > Hi,
> > > >
> > > > I am sorry to restart the discussion here, but I came across a small
> > > > issue in the KIP.
> > > >
> > > > I started to implement KIP-444 and I am bit concerned about the
> values
> > > > for the the config `built.in.metrics.version`. In the KIP the
> possible
> > > > values are specified as all Kafka Streams versions. I think that this
> > > > set of values is really hard to maintain in the code and it also
> blows
> > > > up the testing burden unnecessarily because all versions need to be
> > > > tested. My proposal (backed by John) is to use the following values:
> > > > - `latest` for the latest version of the metrics
> > > > - `0.10.0-2.3` for the version before `latest`
> > > > If in future, let's say in version 4.1, we need again to change the
> > > > metrics, we would add `2.4-4.0` to the values of the config. With
> > > > major versions, we could also get rid of some values.
> > > >
> > > > WDYT?
> > > >
> > > > You can also have a look at the PR
> > > > https://github.com/apache/kafka/pull/7279 to see this in code.
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >>
> > > >> Hello Bruno,
> > > >>
> > > >> I've updated the wiki page again per your comments, here's a brief
> > > summary:
> > > >>
> > > >> 1. added the list of removed metrics.
> > > >> 2. added a task-level INFO metric "dropped-records" that covers all
> > > >> scenarios and merges in the existing "late-records-drop",
> > > >> "skipped-records", and "expired-window-records-drop".
> > > >> 3. renamed the util functions of StreamsMetrics as
> `addLatencyRateTotal`
> > > >> and `addRateTotal` sensors.
> > > >>
> > > >>
> > > >> Since I feel it has incorporated all of your comments I'm going to
> start
> > > >> the vote thread for this KIP now.
> > > >>
> > > >>
> > > >> Guozhang
> > > >>
> > > >>
> > > >> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >>
> > > >>> Hi Bruno,
> > > >>>
> > > >>> No it was not intentional, and we can definitely add the total
> amount
> > > >>> sensor as well -- they are just util functions to save users some
> > > lines of
> > > >>> code anyways, and should be straightforward.
> > > >>>
> > > >>> Guozhang
> > > >>>
> > > >>>
> > > >>> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io>
> > > wrote:
> > > >>>
> > > >>>> Hi Guozhang,
> > > >>>>
> > > >>>> I totally missed the total invocation count metric in the javadoc.
> > > >>>> Which brings me to a follow-up question. Should the names of the
> > > >>>> methods reflect the included total invocation count? We have to
> rename
> > > >>>> them anyways. One option would be to simply add `Total` to the
> method
> > > >>>> names, i.e., `addLatencyAndRateAndTotalSensor` and
> > > >>>> `addRateAndTotalSensor` (alternatively without the `And`s). Since
> > > >>>> those sensors record exclusively invocations, another option
> would be
> > > >>>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
> > > >>>>
> > > >>>> As far as I can see, we have sensors to record invocations but
> none to
> > > >>>> record amounts. Is that intentional? No need to add it to this
> KIP, I
> > > >>>> am just curious.
> > > >>>>
> > > >>>> Best,
> > > >>>> Bruno
> > > >>>>
> > > >>>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wangguoz@gmail.com
> >
> > > wrote:
> > > >>>>>
> > > >>>>> Hi Bruno,
> > > >>>>>
> > > >>>>> Just realized that for `addRateSensor` and
> `addLatencyAndRateSensor`
> > > >>>> we've
> > > >>>>> actually added the total invocation metric already.
> > > >>>>>
> > > >>>>>
> > > >>>>> Guozhang
> > > >>>>>
> > > >>>>> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <
> wangguoz@gmail.com>
> > > >>>> wrote:
> > > >>>>>
> > > >>>>>> Hi Bruno,
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <
> bruno@confluent.io>
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Guozhang,
> > > >>>>>>>
> > > >>>>>>> I left my comments inline.
> > > >>>>>>>
> > > >>>>>>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <
> wangguoz@gmail.com>
> > > >>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>> Hello Bruno,
> > > >>>>>>>>
> > > >>>>>>>> Thanks for the feedbacks, replied inline.
> > > >>>>>>>>
> > > >>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <
> bruno@confluent.io>
> > > >>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi Guozhang,
> > > >>>>>>>>>
> > > >>>>>>>>> Thank you for the KIP.
> > > >>>>>>>>>
> > > >>>>>>>>> 1) As far as I understand, the StreamsMetrics interface is
> there
> > > >>>> for
> > > >>>>>>>>> user-defined processors. Would it make sense to also add a
> > > >>>> method to
> > > >>>>>>>>> the interface to specify a sensor that records skipped
> records?
> > > >>>>>>>>>
> > > >>>>>>>>> Not sure I follow.. if users want to add a specific skipped
> > > >>>> records
> > > >>>>>>>> sensor, she can still do that as a "throughput" sensor via "
> > > >>>>>>>> addThroughputSensor" and then "record" right?
> > > >>>>>>>>
> > > >>>>>>>> As an after-thought, maybe it's better to rename `throughput`
> to
> > > >>>> `rate`
> > > >>>>>>> in
> > > >>>>>>>> the public APIs since it is really meant for the latter
> semantics.
> > > >>>> I did
> > > >>>>>>>> not change it just to make less API changes / deprecate fewer
> > > >>>> functions.
> > > >>>>>>>> But if we feel it is important we can change it as well.
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> I see now that a user can record the rate of skipped records.
> > > >>>> However,
> > > >>>>>>> I was referring to the total number of skipped records. Maybe
> my
> > > >>>>>>> question should be more general: should we allow the user to
> also
> > > >>>>>>> specify sensors for totals or combinations of rate and totals?
> > > >>>>>>>
> > > >>>>>>> Sounds good to me, I will add it to the wiki page as well for
> > > >>>>>> StreamsMetrics.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>> Regarding the naming, I like `rate` more than `throughput`,
> but I
> > > >>>>>>> would not fight for it.
> > > >>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> 2) What are the semantics of active-task-process and
> > > >>>>>>> standby-task-process
> > > >>>>>>>>>
> > > >>>>>>>>> Ah good catch, I think I made it in the wrong column. Just
> some
> > > >>>>>>>> explanations here: Within a thread's looped iterations, it
> will
> > > >>>> first
> > > >>>>>>> try
> > > >>>>>>>> to process some records from the active tasks, and then see if
> > > >>>> there are
> > > >>>>>>>> any standby-tasks that can be processed as well (i.e. just
> reading
> > > >>>> from
> > > >>>>>>> the
> > > >>>>>>>> restore consumer and apply to the local stores). The ratio
> metrics
> > > >>>> are
> > > >>>>>>> for
> > > >>>>>>>> indicating 1) what tasks (active or standby) does this thread
> own
> > > >>>> so
> > > >>>>>>> far,
> > > >>>>>>>> and 2) how much time in percentage does it spend on each of
> them.
> > > >>>>>>>>
> > > >>>>>>>> But this metric should really be a task-level one that
> includes
> > > >>>> both the
> > > >>>>>>>> thread-id and task-id, and upon task migrations they will be
> > > >>>> dynamically
> > > >>>>>>>> deleted / (re)-created. For each task-id it may be owned by
> > > >>>> multiple
> > > >>>>>>>> threads as one active and others standby, and hence the
> separation
> > > >>>> of
> > > >>>>>>>> active / standby seems still necessary.
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> Makes sense.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> 3) How do dropped-late-records and expired-window-record-drop
> > > >>>> relate
> > > >>>>>>>>> to each other? I guess the former is for records that fall
> > > >>>> outside the
> > > >>>>>>>>> grace period and the latter is for records that are processed
> > > >>>> after
> > > >>>>>>>>> the retention period of the window. Is this correct?
> > > >>>>>>>>>
> > > >>>>>>>>> Yes, that's correct. The names are indeed a bit confusing
> since
> > > >>>> they
> > > >>>>>>> are
> > > >>>>>>>> added at different releases historically..
> > > >>>>>>>>
> > > >>>>>>>> More precisely, the `grace period` is a notion of the operator
> > > >>>> (hence
> > > >>>>>>> the
> > > >>>>>>>> metric is node-level, though it would only be used for DSL
> > > >>>> operators)
> > > >>>>>>> while
> > > >>>>>>>> the `retention` is a notion of the store (hence the metric is
> > > >>>>>>> store-level).
> > > >>>>>>>> Usually grace period will be smaller than store retention
> though.
> > > >>>>>>>>
> > > >>>>>>>> Processor node is aware of `grace period` and when received a
> > > >>>> record
> > > >>>>>>> that
> > > >>>>>>>> is older than grace deadline, it will be dropped immediately;
> > > >>>> otherwise
> > > >>>>>>> it
> > > >>>>>>>> will still be processed a maybe a new update is "put" into the
> > > >>>> store.
> > > >>>>>>> The
> > > >>>>>>>> store is aware of its `retention period` and then upon a "put"
> > > >>>> call if
> > > >>>>>>> it
> > > >>>>>>>> realized it is older than the retention deadline, that put
> call
> > > >>>> would be
> > > >>>>>>>> ignored and metric is recorded.
> > > >>>>>>>>
> > > >>>>>>>> We have to separate them here since the window store can be
> used
> > > >>>> in both
> > > >>>>>>>> DSL and PAPI, and for the former case it would likely to be
> > > already
> > > >>>>>>> ignored
> > > >>>>>>>> at the processor node level due to the grace period which is
> > > >>>> usually
> > > >>>>>>>> smaller than retention; but for PAPI there's no grace period
> and
> > > >>>> hence
> > > >>>>>>> the
> > > >>>>>>>> processor would likely still process and call "put" on the
> store.
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> Alright! Got it!
> > > >>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> 4) Is there an actual difference between skipped and dropped
> > > >>>> records?
> > > >>>>>>>>> If not, shall we unify the terminology?
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>> There is. Dropped records are only due to lateness; where as
> > > >>>> skipped
> > > >>>>>>>> records can be due to serde errors (and user's error handling
> > > >>>> indicate
> > > >>>>>>>> "skip and continue"), timestamp errors, etc.
> > > >>>>>>>>
> > > >>>>>>>> I've considered maybe a better (more extensible) way would be
> > > >>>> defining a
> > > >>>>>>>> single metric name, say skipped-records, but use different
> tags to
> > > >>>>>>> indicate
> > > >>>>>>>> if its skipping reason (errors, windowing semantics, etc). But
> > > >>>> there's
> > > >>>>>>>> still a tricky difference: for serde caused skipping for
> example,
> > > >>>> they
> > > >>>>>>> will
> > > >>>>>>>> be skipped at the very beginning and there's no effects taken
> at
> > > >>>> all.
> > > >>>>>>> For
> > > >>>>>>>> some others e.g. null-key / value at the reduce operator, it
> is
> > > >>>> only
> > > >>>>>>>> skipped at the middle of the processing, i.e. some effects may
> > > have
> > > >>>>>>> already
> > > >>>>>>>> been taken in up-stream sub-topologies. And that's why for
> > > >>>>>>> skipped-records
> > > >>>>>>>> I've defined it on both task-level and node-level and the
> > > >>>> aggregate of
> > > >>>>>>> the
> > > >>>>>>>> latter may still be smaller than the former, whereas for
> > > >>>>>>> dropped-records it
> > > >>>>>>>> is only for node-level.
> > > >>>>>>>>
> > > >>>>>>>> So how about an even more significant change then: we enlarge
> the
> > > >>>>>>>> `dropped-late-records` to `dropped-records` which is
> node-level
> > > >>>> only,
> > > >>>>>>> but
> > > >>>>>>>> includes reasons form lateness to semantics (like null-key) as
> > > >>>> well; and
> > > >>>>>>>> then we have a task-level-only `skipped-records` which only
> record
> > > >>>> those
> > > >>>>>>>> dropped at the very beginning and did not make it at all to
> the
> > > >>>>>>> processing
> > > >>>>>>>> topology. I feel this is a clearer distinguishment but also a
> > > >>>> bigger
> > > >>>>>>> change
> > > >>>>>>>> to users.
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> I like the way you dropped-records and skipped-records are now
> > > >>>>>>> defined. My follow-up question is whether we should give names
> to
> > > >>>>>>> those metrics that better describe their semantics, like:
> > > >>>>>>>
> > > >>>>>>> dropped-records-at-source and dropped-records-at-processor
> > > >>>>>>>
> > > >>>>>>> or
> > > >>>>>>>
> > > >>>>>>> records-dropped-at-source and records-dropped-at-processor
> > > >>>>>>>
> > > >>>>>>> or
> > > >>>>>>>
> > > >>>>>>> source-dropped-records and processor-dropped-records
> > > >>>>>>>
> > > >>>>>>> or alternatively with skipped. However, I would use the same
> term
> > > as
> > > >>>>>>> in expired-window-record-drop
> > > >>>>>>>
> > > >>>>>>> Maybe, we should also consider to rename
> expired-window-record-drop
> > > >>>> to
> > > >>>>>>> expired-window-record-dropped to be consistent.
> > > >>>>>>>
> > > >>>>>>> WDYT?
> > > >>>>>>>
> > > >>>>>>> I was not considering "expired-window-record-drop" before
> since it
> > > >>>> is a
> > > >>>>>> store-level metric, and I was only considering task-level
> > > >>>> (skipped-records)
> > > >>>>>> and processor-node-level (dropped-records) metrics, and I'm
> using
> > > >>>> different
> > > >>>>>> terms deliberately to hint users that they are different leveled
> > > >>>> metrics.
> > > >>>>>>
> > > >>>>>> I still feel that using `skip` for task-level metrics indicating
> > > that
> > > >>>> this
> > > >>>>>> record was not processed at all, and using `drop` for
> > > processor-level
> > > >>>>>> metrics that this record is only dropped at this stage of the
> > > >>>> topology is a
> > > >>>>>> better one; but I'm also okay with some finer grained metrics so
> > > that
> > > >>>> we
> > > >>>>>> can align the processor-level with store-level (they are on the
> same
> > > >>>>>> granularity any ways), like:
> > > >>>>>>
> > > >>>>>> `dropped-records-null-field`: at processor nodes
> > > >>>>>>
> > > >>>>>> `dropped-records-too-late`: at processor nodes
> > > >>>>>>
> > > >>>>>> `dropped-records-expired-window`: at window-stores
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> 5) What happens with removed metrics when the user sets the
> > > >>>> version of
> > > >>>>>>>>> "built.in.metrics.version" to 2.2-
> > > >>>>>>>>>
> > > >>>>>>>>> I think for those redundant ones like ""forward-rate" and
> > > >>>>>>> "destroy-rate"
> > > >>>>>>>> we can still remove them with 2.2- as well; for other ones
> that
> > > are
> > > >>>>>>> removed
> > > >>>>>>>> / replaced like thread-level skipped-records we should still
> > > >>>> maintain
> > > >>>>>>> them.
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> Could you add this comment about removal of redundant metrics
> to
> > > the
> > > >>>>>>> KIP such that is documented somewhere?
> > > >>>>>>>
> > > >>>>>>> Yes, for sure.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Bruno
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>> I've also decided to remove the rebalance-related metrics from
> the
> > > >>>>>> instance-level and move it to consumer itself as part of
> KIP-429.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>> -- Guozhang
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> --
> > > >>>>> -- Guozhang
> > > >>>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> -- Guozhang
> > > >>>
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > >
> > >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Guozhang,

I think user experience and code maintenance are tightly related. The
harder to maintain the code the worse the user experience will get.

Making the config optional does not solve the issue. Wouldn't users be
puzzled when we release 2.5 and they cannot set
built.in.metrics.version to 2.4 to be sure to get the same metrics for
that version? It seems with that solution we would just move
maintenance to the next release.

I cannot see how the values `latest` and `0.10.0-2.3` would lead to a
bad user experience.

Regarding testing, at least on integration test level, we absolutely
need to test all versions. It is too easy to make a mistake with so
many versions. Remember that on integration test level we need to
start an embedded Kafka for each single test.

Best,
Bruno

On Thu, Sep 5, 2019 at 8:46 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> Hi Bruno,
>
> Thanks for raising this point. I think the main motivation behind this
> proposal is, like Matthias said, to ease the understanding burden from
> users to our own shoulders. Testing wise, I think we do not necessarily
> need to explode the testing matrix but just test the last version before
> each metrics refactoring (again, hopefully it is the only time) and hence I
> think it worth benefiting user's experience. WDYT?
>
> Hi Matthias,
>
> Thanks for your feedback, I will update the wiki page accordingly.
>
> Will also close the other voting thread with your vote.
>
> Guozhang
>
> On Thu, Sep 5, 2019 at 11:27 AM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > I share Bruno's concern about future releases, however, I would make
> > slightly different proposal.
> >
> > Instead of using "latest" we can just make the config optional and if
> > not set, we use the new metrics code? This way we don't need to add a
> > new version number each time we do a new release (note, that it would be
> > weird to keep default value "2.4" in future releases).
> >
> > For enabling backward compatibility: I don't have a strong opinion if we
> > should have a single value "0.10.0-2.3" or list each version individually.
> >
> > In KIP-268 (fixing metadata upgrade) we decided to list each version
> > individually as it seems simpler for users. Also, we wanted to hide
> > which release uses which metadata version (v0 in 0.10.0, and v1 in
> > 0.10.1 to 1.1). We could have collapsed `0.10.1` to `1.1` into a single
> > value though but it seemed not to give best user experience.
> >
> > I think this KIP is a little different though and both options seems to
> > be valid. However, I would like to emphasize that we should optimize for
> > user experience (and not if it's harder/easier to test etc---in doubt,
> > we should always take on the burden if is helps to lift the burden from
> > users).
> >
> > Overall, I am +1
> >
> > Some nits:
> >
> > (1) I think the motivation section for updating `StreamsMetrics`
> > interface does not make it clear why we need the change. What is the
> > issue with the current interface and how do the new method address the
> > issue
> >
> > (2) The metric name `put | put-if-absent .. | get-latency (avg | max)`
> > is hard to read because is indicate that there is a `get-latency` method
> > call on stores -- can we update it to
> >
> > `(put | put-if-absent .. | get)-latency (avg | max)`
> >
> > (3) typo: `When users override it to "2.2" or below,` this should be
> > "2.3" -- or maybe even different if Bruno's concern gets addressed.
> >
> >
> >
> >
> > -Matthias
> >
> >
> >
> >
> >
> > On 9/4/19 12:26 PM, Bruno Cadonna wrote:
> > > Hi,
> > >
> > > I am sorry to restart the discussion here, but I came across a small
> > > issue in the KIP.
> > >
> > > I started to implement KIP-444 and I am bit concerned about the values
> > > for the the config `built.in.metrics.version`. In the KIP the possible
> > > values are specified as all Kafka Streams versions. I think that this
> > > set of values is really hard to maintain in the code and it also blows
> > > up the testing burden unnecessarily because all versions need to be
> > > tested. My proposal (backed by John) is to use the following values:
> > > - `latest` for the latest version of the metrics
> > > - `0.10.0-2.3` for the version before `latest`
> > > If in future, let's say in version 4.1, we need again to change the
> > > metrics, we would add `2.4-4.0` to the values of the config. With
> > > major versions, we could also get rid of some values.
> > >
> > > WDYT?
> > >
> > > You can also have a look at the PR
> > > https://github.com/apache/kafka/pull/7279 to see this in code.
> > >
> > > Best,
> > > Bruno
> > >
> > > On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >>
> > >> Hello Bruno,
> > >>
> > >> I've updated the wiki page again per your comments, here's a brief
> > summary:
> > >>
> > >> 1. added the list of removed metrics.
> > >> 2. added a task-level INFO metric "dropped-records" that covers all
> > >> scenarios and merges in the existing "late-records-drop",
> > >> "skipped-records", and "expired-window-records-drop".
> > >> 3. renamed the util functions of StreamsMetrics as `addLatencyRateTotal`
> > >> and `addRateTotal` sensors.
> > >>
> > >>
> > >> Since I feel it has incorporated all of your comments I'm going to start
> > >> the vote thread for this KIP now.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >>
> > >>> Hi Bruno,
> > >>>
> > >>> No it was not intentional, and we can definitely add the total amount
> > >>> sensor as well -- they are just util functions to save users some
> > lines of
> > >>> code anyways, and should be straightforward.
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io>
> > wrote:
> > >>>
> > >>>> Hi Guozhang,
> > >>>>
> > >>>> I totally missed the total invocation count metric in the javadoc.
> > >>>> Which brings me to a follow-up question. Should the names of the
> > >>>> methods reflect the included total invocation count? We have to rename
> > >>>> them anyways. One option would be to simply add `Total` to the method
> > >>>> names, i.e., `addLatencyAndRateAndTotalSensor` and
> > >>>> `addRateAndTotalSensor` (alternatively without the `And`s). Since
> > >>>> those sensors record exclusively invocations, another option would be
> > >>>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
> > >>>>
> > >>>> As far as I can see, we have sensors to record invocations but none to
> > >>>> record amounts. Is that intentional? No need to add it to this KIP, I
> > >>>> am just curious.
> > >>>>
> > >>>> Best,
> > >>>> Bruno
> > >>>>
> > >>>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >>>>>
> > >>>>> Hi Bruno,
> > >>>>>
> > >>>>> Just realized that for `addRateSensor` and `addLatencyAndRateSensor`
> > >>>> we've
> > >>>>> actually added the total invocation metric already.
> > >>>>>
> > >>>>>
> > >>>>> Guozhang
> > >>>>>
> > >>>>> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wa...@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>>> Hi Bruno,
> > >>>>>>
> > >>>>>>
> > >>>>>> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io>
> > >>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Guozhang,
> > >>>>>>>
> > >>>>>>> I left my comments inline.
> > >>>>>>>
> > >>>>>>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wa...@gmail.com>
> > >>>> wrote:
> > >>>>>>>>
> > >>>>>>>> Hello Bruno,
> > >>>>>>>>
> > >>>>>>>> Thanks for the feedbacks, replied inline.
> > >>>>>>>>
> > >>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Guozhang,
> > >>>>>>>>>
> > >>>>>>>>> Thank you for the KIP.
> > >>>>>>>>>
> > >>>>>>>>> 1) As far as I understand, the StreamsMetrics interface is there
> > >>>> for
> > >>>>>>>>> user-defined processors. Would it make sense to also add a
> > >>>> method to
> > >>>>>>>>> the interface to specify a sensor that records skipped records?
> > >>>>>>>>>
> > >>>>>>>>> Not sure I follow.. if users want to add a specific skipped
> > >>>> records
> > >>>>>>>> sensor, she can still do that as a "throughput" sensor via "
> > >>>>>>>> addThroughputSensor" and then "record" right?
> > >>>>>>>>
> > >>>>>>>> As an after-thought, maybe it's better to rename `throughput` to
> > >>>> `rate`
> > >>>>>>> in
> > >>>>>>>> the public APIs since it is really meant for the latter semantics.
> > >>>> I did
> > >>>>>>>> not change it just to make less API changes / deprecate fewer
> > >>>> functions.
> > >>>>>>>> But if we feel it is important we can change it as well.
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> I see now that a user can record the rate of skipped records.
> > >>>> However,
> > >>>>>>> I was referring to the total number of skipped records. Maybe my
> > >>>>>>> question should be more general: should we allow the user to also
> > >>>>>>> specify sensors for totals or combinations of rate and totals?
> > >>>>>>>
> > >>>>>>> Sounds good to me, I will add it to the wiki page as well for
> > >>>>>> StreamsMetrics.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>> Regarding the naming, I like `rate` more than `throughput`, but I
> > >>>>>>> would not fight for it.
> > >>>>>>>
> > >>>>>>>>
> > >>>>>>>>> 2) What are the semantics of active-task-process and
> > >>>>>>> standby-task-process
> > >>>>>>>>>
> > >>>>>>>>> Ah good catch, I think I made it in the wrong column. Just some
> > >>>>>>>> explanations here: Within a thread's looped iterations, it will
> > >>>> first
> > >>>>>>> try
> > >>>>>>>> to process some records from the active tasks, and then see if
> > >>>> there are
> > >>>>>>>> any standby-tasks that can be processed as well (i.e. just reading
> > >>>> from
> > >>>>>>> the
> > >>>>>>>> restore consumer and apply to the local stores). The ratio metrics
> > >>>> are
> > >>>>>>> for
> > >>>>>>>> indicating 1) what tasks (active or standby) does this thread own
> > >>>> so
> > >>>>>>> far,
> > >>>>>>>> and 2) how much time in percentage does it spend on each of them.
> > >>>>>>>>
> > >>>>>>>> But this metric should really be a task-level one that includes
> > >>>> both the
> > >>>>>>>> thread-id and task-id, and upon task migrations they will be
> > >>>> dynamically
> > >>>>>>>> deleted / (re)-created. For each task-id it may be owned by
> > >>>> multiple
> > >>>>>>>> threads as one active and others standby, and hence the separation
> > >>>> of
> > >>>>>>>> active / standby seems still necessary.
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> Makes sense.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>> 3) How do dropped-late-records and expired-window-record-drop
> > >>>> relate
> > >>>>>>>>> to each other? I guess the former is for records that fall
> > >>>> outside the
> > >>>>>>>>> grace period and the latter is for records that are processed
> > >>>> after
> > >>>>>>>>> the retention period of the window. Is this correct?
> > >>>>>>>>>
> > >>>>>>>>> Yes, that's correct. The names are indeed a bit confusing since
> > >>>> they
> > >>>>>>> are
> > >>>>>>>> added at different releases historically..
> > >>>>>>>>
> > >>>>>>>> More precisely, the `grace period` is a notion of the operator
> > >>>> (hence
> > >>>>>>> the
> > >>>>>>>> metric is node-level, though it would only be used for DSL
> > >>>> operators)
> > >>>>>>> while
> > >>>>>>>> the `retention` is a notion of the store (hence the metric is
> > >>>>>>> store-level).
> > >>>>>>>> Usually grace period will be smaller than store retention though.
> > >>>>>>>>
> > >>>>>>>> Processor node is aware of `grace period` and when received a
> > >>>> record
> > >>>>>>> that
> > >>>>>>>> is older than grace deadline, it will be dropped immediately;
> > >>>> otherwise
> > >>>>>>> it
> > >>>>>>>> will still be processed a maybe a new update is "put" into the
> > >>>> store.
> > >>>>>>> The
> > >>>>>>>> store is aware of its `retention period` and then upon a "put"
> > >>>> call if
> > >>>>>>> it
> > >>>>>>>> realized it is older than the retention deadline, that put call
> > >>>> would be
> > >>>>>>>> ignored and metric is recorded.
> > >>>>>>>>
> > >>>>>>>> We have to separate them here since the window store can be used
> > >>>> in both
> > >>>>>>>> DSL and PAPI, and for the former case it would likely to be
> > already
> > >>>>>>> ignored
> > >>>>>>>> at the processor node level due to the grace period which is
> > >>>> usually
> > >>>>>>>> smaller than retention; but for PAPI there's no grace period and
> > >>>> hence
> > >>>>>>> the
> > >>>>>>>> processor would likely still process and call "put" on the store.
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> Alright! Got it!
> > >>>>>>>
> > >>>>>>>>
> > >>>>>>>>> 4) Is there an actual difference between skipped and dropped
> > >>>> records?
> > >>>>>>>>> If not, shall we unify the terminology?
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>> There is. Dropped records are only due to lateness; where as
> > >>>> skipped
> > >>>>>>>> records can be due to serde errors (and user's error handling
> > >>>> indicate
> > >>>>>>>> "skip and continue"), timestamp errors, etc.
> > >>>>>>>>
> > >>>>>>>> I've considered maybe a better (more extensible) way would be
> > >>>> defining a
> > >>>>>>>> single metric name, say skipped-records, but use different tags to
> > >>>>>>> indicate
> > >>>>>>>> if its skipping reason (errors, windowing semantics, etc). But
> > >>>> there's
> > >>>>>>>> still a tricky difference: for serde caused skipping for example,
> > >>>> they
> > >>>>>>> will
> > >>>>>>>> be skipped at the very beginning and there's no effects taken at
> > >>>> all.
> > >>>>>>> For
> > >>>>>>>> some others e.g. null-key / value at the reduce operator, it is
> > >>>> only
> > >>>>>>>> skipped at the middle of the processing, i.e. some effects may
> > have
> > >>>>>>> already
> > >>>>>>>> been taken in up-stream sub-topologies. And that's why for
> > >>>>>>> skipped-records
> > >>>>>>>> I've defined it on both task-level and node-level and the
> > >>>> aggregate of
> > >>>>>>> the
> > >>>>>>>> latter may still be smaller than the former, whereas for
> > >>>>>>> dropped-records it
> > >>>>>>>> is only for node-level.
> > >>>>>>>>
> > >>>>>>>> So how about an even more significant change then: we enlarge the
> > >>>>>>>> `dropped-late-records` to `dropped-records` which is node-level
> > >>>> only,
> > >>>>>>> but
> > >>>>>>>> includes reasons form lateness to semantics (like null-key) as
> > >>>> well; and
> > >>>>>>>> then we have a task-level-only `skipped-records` which only record
> > >>>> those
> > >>>>>>>> dropped at the very beginning and did not make it at all to the
> > >>>>>>> processing
> > >>>>>>>> topology. I feel this is a clearer distinguishment but also a
> > >>>> bigger
> > >>>>>>> change
> > >>>>>>>> to users.
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> I like the way you dropped-records and skipped-records are now
> > >>>>>>> defined. My follow-up question is whether we should give names to
> > >>>>>>> those metrics that better describe their semantics, like:
> > >>>>>>>
> > >>>>>>> dropped-records-at-source and dropped-records-at-processor
> > >>>>>>>
> > >>>>>>> or
> > >>>>>>>
> > >>>>>>> records-dropped-at-source and records-dropped-at-processor
> > >>>>>>>
> > >>>>>>> or
> > >>>>>>>
> > >>>>>>> source-dropped-records and processor-dropped-records
> > >>>>>>>
> > >>>>>>> or alternatively with skipped. However, I would use the same term
> > as
> > >>>>>>> in expired-window-record-drop
> > >>>>>>>
> > >>>>>>> Maybe, we should also consider to rename expired-window-record-drop
> > >>>> to
> > >>>>>>> expired-window-record-dropped to be consistent.
> > >>>>>>>
> > >>>>>>> WDYT?
> > >>>>>>>
> > >>>>>>> I was not considering "expired-window-record-drop" before since it
> > >>>> is a
> > >>>>>> store-level metric, and I was only considering task-level
> > >>>> (skipped-records)
> > >>>>>> and processor-node-level (dropped-records) metrics, and I'm using
> > >>>> different
> > >>>>>> terms deliberately to hint users that they are different leveled
> > >>>> metrics.
> > >>>>>>
> > >>>>>> I still feel that using `skip` for task-level metrics indicating
> > that
> > >>>> this
> > >>>>>> record was not processed at all, and using `drop` for
> > processor-level
> > >>>>>> metrics that this record is only dropped at this stage of the
> > >>>> topology is a
> > >>>>>> better one; but I'm also okay with some finer grained metrics so
> > that
> > >>>> we
> > >>>>>> can align the processor-level with store-level (they are on the same
> > >>>>>> granularity any ways), like:
> > >>>>>>
> > >>>>>> `dropped-records-null-field`: at processor nodes
> > >>>>>>
> > >>>>>> `dropped-records-too-late`: at processor nodes
> > >>>>>>
> > >>>>>> `dropped-records-expired-window`: at window-stores
> > >>>>>>
> > >>>>>>
> > >>>>>>>>
> > >>>>>>>>> 5) What happens with removed metrics when the user sets the
> > >>>> version of
> > >>>>>>>>> "built.in.metrics.version" to 2.2-
> > >>>>>>>>>
> > >>>>>>>>> I think for those redundant ones like ""forward-rate" and
> > >>>>>>> "destroy-rate"
> > >>>>>>>> we can still remove them with 2.2- as well; for other ones that
> > are
> > >>>>>>> removed
> > >>>>>>>> / replaced like thread-level skipped-records we should still
> > >>>> maintain
> > >>>>>>> them.
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> Could you add this comment about removal of redundant metrics to
> > the
> > >>>>>>> KIP such that is documented somewhere?
> > >>>>>>>
> > >>>>>>> Yes, for sure.
> > >>>>>>
> > >>>>>>
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Bruno
> > >>>>>>>
> > >>>>>>
> > >>>>>> I've also decided to remove the rebalance-related metrics from the
> > >>>>>> instance-level and move it to consumer itself as part of KIP-429.
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>> --
> > >>>>> -- Guozhang
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> >
> >
>
> --
> -- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Bruno,

Thanks for raising this point. I think the main motivation behind this
proposal is, like Matthias said, to ease the understanding burden from
users to our own shoulders. Testing wise, I think we do not necessarily
need to explode the testing matrix but just test the last version before
each metrics refactoring (again, hopefully it is the only time) and hence I
think it worth benefiting user's experience. WDYT?

Hi Matthias,

Thanks for your feedback, I will update the wiki page accordingly.

Will also close the other voting thread with your vote.

Guozhang

On Thu, Sep 5, 2019 at 11:27 AM Matthias J. Sax <ma...@confluent.io>
wrote:

> I share Bruno's concern about future releases, however, I would make
> slightly different proposal.
>
> Instead of using "latest" we can just make the config optional and if
> not set, we use the new metrics code? This way we don't need to add a
> new version number each time we do a new release (note, that it would be
> weird to keep default value "2.4" in future releases).
>
> For enabling backward compatibility: I don't have a strong opinion if we
> should have a single value "0.10.0-2.3" or list each version individually.
>
> In KIP-268 (fixing metadata upgrade) we decided to list each version
> individually as it seems simpler for users. Also, we wanted to hide
> which release uses which metadata version (v0 in 0.10.0, and v1 in
> 0.10.1 to 1.1). We could have collapsed `0.10.1` to `1.1` into a single
> value though but it seemed not to give best user experience.
>
> I think this KIP is a little different though and both options seems to
> be valid. However, I would like to emphasize that we should optimize for
> user experience (and not if it's harder/easier to test etc---in doubt,
> we should always take on the burden if is helps to lift the burden from
> users).
>
> Overall, I am +1
>
> Some nits:
>
> (1) I think the motivation section for updating `StreamsMetrics`
> interface does not make it clear why we need the change. What is the
> issue with the current interface and how do the new method address the
> issue
>
> (2) The metric name `put | put-if-absent .. | get-latency (avg | max)`
> is hard to read because is indicate that there is a `get-latency` method
> call on stores -- can we update it to
>
> `(put | put-if-absent .. | get)-latency (avg | max)`
>
> (3) typo: `When users override it to "2.2" or below,` this should be
> "2.3" -- or maybe even different if Bruno's concern gets addressed.
>
>
>
>
> -Matthias
>
>
>
>
>
> On 9/4/19 12:26 PM, Bruno Cadonna wrote:
> > Hi,
> >
> > I am sorry to restart the discussion here, but I came across a small
> > issue in the KIP.
> >
> > I started to implement KIP-444 and I am bit concerned about the values
> > for the the config `built.in.metrics.version`. In the KIP the possible
> > values are specified as all Kafka Streams versions. I think that this
> > set of values is really hard to maintain in the code and it also blows
> > up the testing burden unnecessarily because all versions need to be
> > tested. My proposal (backed by John) is to use the following values:
> > - `latest` for the latest version of the metrics
> > - `0.10.0-2.3` for the version before `latest`
> > If in future, let's say in version 4.1, we need again to change the
> > metrics, we would add `2.4-4.0` to the values of the config. With
> > major versions, we could also get rid of some values.
> >
> > WDYT?
> >
> > You can also have a look at the PR
> > https://github.com/apache/kafka/pull/7279 to see this in code.
> >
> > Best,
> > Bruno
> >
> > On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >>
> >> Hello Bruno,
> >>
> >> I've updated the wiki page again per your comments, here's a brief
> summary:
> >>
> >> 1. added the list of removed metrics.
> >> 2. added a task-level INFO metric "dropped-records" that covers all
> >> scenarios and merges in the existing "late-records-drop",
> >> "skipped-records", and "expired-window-records-drop".
> >> 3. renamed the util functions of StreamsMetrics as `addLatencyRateTotal`
> >> and `addRateTotal` sensors.
> >>
> >>
> >> Since I feel it has incorporated all of your comments I'm going to start
> >> the vote thread for this KIP now.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >>
> >>> Hi Bruno,
> >>>
> >>> No it was not intentional, and we can definitely add the total amount
> >>> sensor as well -- they are just util functions to save users some
> lines of
> >>> code anyways, and should be straightforward.
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io>
> wrote:
> >>>
> >>>> Hi Guozhang,
> >>>>
> >>>> I totally missed the total invocation count metric in the javadoc.
> >>>> Which brings me to a follow-up question. Should the names of the
> >>>> methods reflect the included total invocation count? We have to rename
> >>>> them anyways. One option would be to simply add `Total` to the method
> >>>> names, i.e., `addLatencyAndRateAndTotalSensor` and
> >>>> `addRateAndTotalSensor` (alternatively without the `And`s). Since
> >>>> those sensors record exclusively invocations, another option would be
> >>>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
> >>>>
> >>>> As far as I can see, we have sensors to record invocations but none to
> >>>> record amounts. Is that intentional? No need to add it to this KIP, I
> >>>> am just curious.
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >>>>>
> >>>>> Hi Bruno,
> >>>>>
> >>>>> Just realized that for `addRateSensor` and `addLatencyAndRateSensor`
> >>>> we've
> >>>>> actually added the total invocation metric already.
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wa...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Bruno,
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io>
> >>>> wrote:
> >>>>>>
> >>>>>>> Hi Guozhang,
> >>>>>>>
> >>>>>>> I left my comments inline.
> >>>>>>>
> >>>>>>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wa...@gmail.com>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>> Hello Bruno,
> >>>>>>>>
> >>>>>>>> Thanks for the feedbacks, replied inline.
> >>>>>>>>
> >>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Guozhang,
> >>>>>>>>>
> >>>>>>>>> Thank you for the KIP.
> >>>>>>>>>
> >>>>>>>>> 1) As far as I understand, the StreamsMetrics interface is there
> >>>> for
> >>>>>>>>> user-defined processors. Would it make sense to also add a
> >>>> method to
> >>>>>>>>> the interface to specify a sensor that records skipped records?
> >>>>>>>>>
> >>>>>>>>> Not sure I follow.. if users want to add a specific skipped
> >>>> records
> >>>>>>>> sensor, she can still do that as a "throughput" sensor via "
> >>>>>>>> addThroughputSensor" and then "record" right?
> >>>>>>>>
> >>>>>>>> As an after-thought, maybe it's better to rename `throughput` to
> >>>> `rate`
> >>>>>>> in
> >>>>>>>> the public APIs since it is really meant for the latter semantics.
> >>>> I did
> >>>>>>>> not change it just to make less API changes / deprecate fewer
> >>>> functions.
> >>>>>>>> But if we feel it is important we can change it as well.
> >>>>>>>>
> >>>>>>>
> >>>>>>> I see now that a user can record the rate of skipped records.
> >>>> However,
> >>>>>>> I was referring to the total number of skipped records. Maybe my
> >>>>>>> question should be more general: should we allow the user to also
> >>>>>>> specify sensors for totals or combinations of rate and totals?
> >>>>>>>
> >>>>>>> Sounds good to me, I will add it to the wiki page as well for
> >>>>>> StreamsMetrics.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> Regarding the naming, I like `rate` more than `throughput`, but I
> >>>>>>> would not fight for it.
> >>>>>>>
> >>>>>>>>
> >>>>>>>>> 2) What are the semantics of active-task-process and
> >>>>>>> standby-task-process
> >>>>>>>>>
> >>>>>>>>> Ah good catch, I think I made it in the wrong column. Just some
> >>>>>>>> explanations here: Within a thread's looped iterations, it will
> >>>> first
> >>>>>>> try
> >>>>>>>> to process some records from the active tasks, and then see if
> >>>> there are
> >>>>>>>> any standby-tasks that can be processed as well (i.e. just reading
> >>>> from
> >>>>>>> the
> >>>>>>>> restore consumer and apply to the local stores). The ratio metrics
> >>>> are
> >>>>>>> for
> >>>>>>>> indicating 1) what tasks (active or standby) does this thread own
> >>>> so
> >>>>>>> far,
> >>>>>>>> and 2) how much time in percentage does it spend on each of them.
> >>>>>>>>
> >>>>>>>> But this metric should really be a task-level one that includes
> >>>> both the
> >>>>>>>> thread-id and task-id, and upon task migrations they will be
> >>>> dynamically
> >>>>>>>> deleted / (re)-created. For each task-id it may be owned by
> >>>> multiple
> >>>>>>>> threads as one active and others standby, and hence the separation
> >>>> of
> >>>>>>>> active / standby seems still necessary.
> >>>>>>>>
> >>>>>>>
> >>>>>>> Makes sense.
> >>>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> 3) How do dropped-late-records and expired-window-record-drop
> >>>> relate
> >>>>>>>>> to each other? I guess the former is for records that fall
> >>>> outside the
> >>>>>>>>> grace period and the latter is for records that are processed
> >>>> after
> >>>>>>>>> the retention period of the window. Is this correct?
> >>>>>>>>>
> >>>>>>>>> Yes, that's correct. The names are indeed a bit confusing since
> >>>> they
> >>>>>>> are
> >>>>>>>> added at different releases historically..
> >>>>>>>>
> >>>>>>>> More precisely, the `grace period` is a notion of the operator
> >>>> (hence
> >>>>>>> the
> >>>>>>>> metric is node-level, though it would only be used for DSL
> >>>> operators)
> >>>>>>> while
> >>>>>>>> the `retention` is a notion of the store (hence the metric is
> >>>>>>> store-level).
> >>>>>>>> Usually grace period will be smaller than store retention though.
> >>>>>>>>
> >>>>>>>> Processor node is aware of `grace period` and when received a
> >>>> record
> >>>>>>> that
> >>>>>>>> is older than grace deadline, it will be dropped immediately;
> >>>> otherwise
> >>>>>>> it
> >>>>>>>> will still be processed a maybe a new update is "put" into the
> >>>> store.
> >>>>>>> The
> >>>>>>>> store is aware of its `retention period` and then upon a "put"
> >>>> call if
> >>>>>>> it
> >>>>>>>> realized it is older than the retention deadline, that put call
> >>>> would be
> >>>>>>>> ignored and metric is recorded.
> >>>>>>>>
> >>>>>>>> We have to separate them here since the window store can be used
> >>>> in both
> >>>>>>>> DSL and PAPI, and for the former case it would likely to be
> already
> >>>>>>> ignored
> >>>>>>>> at the processor node level due to the grace period which is
> >>>> usually
> >>>>>>>> smaller than retention; but for PAPI there's no grace period and
> >>>> hence
> >>>>>>> the
> >>>>>>>> processor would likely still process and call "put" on the store.
> >>>>>>>>
> >>>>>>>
> >>>>>>> Alright! Got it!
> >>>>>>>
> >>>>>>>>
> >>>>>>>>> 4) Is there an actual difference between skipped and dropped
> >>>> records?
> >>>>>>>>> If not, shall we unify the terminology?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>> There is. Dropped records are only due to lateness; where as
> >>>> skipped
> >>>>>>>> records can be due to serde errors (and user's error handling
> >>>> indicate
> >>>>>>>> "skip and continue"), timestamp errors, etc.
> >>>>>>>>
> >>>>>>>> I've considered maybe a better (more extensible) way would be
> >>>> defining a
> >>>>>>>> single metric name, say skipped-records, but use different tags to
> >>>>>>> indicate
> >>>>>>>> if its skipping reason (errors, windowing semantics, etc). But
> >>>> there's
> >>>>>>>> still a tricky difference: for serde caused skipping for example,
> >>>> they
> >>>>>>> will
> >>>>>>>> be skipped at the very beginning and there's no effects taken at
> >>>> all.
> >>>>>>> For
> >>>>>>>> some others e.g. null-key / value at the reduce operator, it is
> >>>> only
> >>>>>>>> skipped at the middle of the processing, i.e. some effects may
> have
> >>>>>>> already
> >>>>>>>> been taken in up-stream sub-topologies. And that's why for
> >>>>>>> skipped-records
> >>>>>>>> I've defined it on both task-level and node-level and the
> >>>> aggregate of
> >>>>>>> the
> >>>>>>>> latter may still be smaller than the former, whereas for
> >>>>>>> dropped-records it
> >>>>>>>> is only for node-level.
> >>>>>>>>
> >>>>>>>> So how about an even more significant change then: we enlarge the
> >>>>>>>> `dropped-late-records` to `dropped-records` which is node-level
> >>>> only,
> >>>>>>> but
> >>>>>>>> includes reasons form lateness to semantics (like null-key) as
> >>>> well; and
> >>>>>>>> then we have a task-level-only `skipped-records` which only record
> >>>> those
> >>>>>>>> dropped at the very beginning and did not make it at all to the
> >>>>>>> processing
> >>>>>>>> topology. I feel this is a clearer distinguishment but also a
> >>>> bigger
> >>>>>>> change
> >>>>>>>> to users.
> >>>>>>>>
> >>>>>>>
> >>>>>>> I like the way you dropped-records and skipped-records are now
> >>>>>>> defined. My follow-up question is whether we should give names to
> >>>>>>> those metrics that better describe their semantics, like:
> >>>>>>>
> >>>>>>> dropped-records-at-source and dropped-records-at-processor
> >>>>>>>
> >>>>>>> or
> >>>>>>>
> >>>>>>> records-dropped-at-source and records-dropped-at-processor
> >>>>>>>
> >>>>>>> or
> >>>>>>>
> >>>>>>> source-dropped-records and processor-dropped-records
> >>>>>>>
> >>>>>>> or alternatively with skipped. However, I would use the same term
> as
> >>>>>>> in expired-window-record-drop
> >>>>>>>
> >>>>>>> Maybe, we should also consider to rename expired-window-record-drop
> >>>> to
> >>>>>>> expired-window-record-dropped to be consistent.
> >>>>>>>
> >>>>>>> WDYT?
> >>>>>>>
> >>>>>>> I was not considering "expired-window-record-drop" before since it
> >>>> is a
> >>>>>> store-level metric, and I was only considering task-level
> >>>> (skipped-records)
> >>>>>> and processor-node-level (dropped-records) metrics, and I'm using
> >>>> different
> >>>>>> terms deliberately to hint users that they are different leveled
> >>>> metrics.
> >>>>>>
> >>>>>> I still feel that using `skip` for task-level metrics indicating
> that
> >>>> this
> >>>>>> record was not processed at all, and using `drop` for
> processor-level
> >>>>>> metrics that this record is only dropped at this stage of the
> >>>> topology is a
> >>>>>> better one; but I'm also okay with some finer grained metrics so
> that
> >>>> we
> >>>>>> can align the processor-level with store-level (they are on the same
> >>>>>> granularity any ways), like:
> >>>>>>
> >>>>>> `dropped-records-null-field`: at processor nodes
> >>>>>>
> >>>>>> `dropped-records-too-late`: at processor nodes
> >>>>>>
> >>>>>> `dropped-records-expired-window`: at window-stores
> >>>>>>
> >>>>>>
> >>>>>>>>
> >>>>>>>>> 5) What happens with removed metrics when the user sets the
> >>>> version of
> >>>>>>>>> "built.in.metrics.version" to 2.2-
> >>>>>>>>>
> >>>>>>>>> I think for those redundant ones like ""forward-rate" and
> >>>>>>> "destroy-rate"
> >>>>>>>> we can still remove them with 2.2- as well; for other ones that
> are
> >>>>>>> removed
> >>>>>>>> / replaced like thread-level skipped-records we should still
> >>>> maintain
> >>>>>>> them.
> >>>>>>>>
> >>>>>>>
> >>>>>>> Could you add this comment about removal of redundant metrics to
> the
> >>>>>>> KIP such that is documented somewhere?
> >>>>>>>
> >>>>>>> Yes, for sure.
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>>
> >>>>>>
> >>>>>> I've also decided to remove the rebalance-related metrics from the
> >>>>>> instance-level and move it to consumer itself as part of KIP-429.
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
>
>

-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I share Bruno's concern about future releases, however, I would make
slightly different proposal.

Instead of using "latest" we can just make the config optional and if
not set, we use the new metrics code? This way we don't need to add a
new version number each time we do a new release (note, that it would be
weird to keep default value "2.4" in future releases).

For enabling backward compatibility: I don't have a strong opinion if we
should have a single value "0.10.0-2.3" or list each version individually.

In KIP-268 (fixing metadata upgrade) we decided to list each version
individually as it seems simpler for users. Also, we wanted to hide
which release uses which metadata version (v0 in 0.10.0, and v1 in
0.10.1 to 1.1). We could have collapsed `0.10.1` to `1.1` into a single
value though but it seemed not to give best user experience.

I think this KIP is a little different though and both options seems to
be valid. However, I would like to emphasize that we should optimize for
user experience (and not if it's harder/easier to test etc---in doubt,
we should always take on the burden if is helps to lift the burden from
users).

Overall, I am +1

Some nits:

(1) I think the motivation section for updating `StreamsMetrics`
interface does not make it clear why we need the change. What is the
issue with the current interface and how do the new method address the issue

(2) The metric name `put | put-if-absent .. | get-latency (avg | max)`
is hard to read because is indicate that there is a `get-latency` method
call on stores -- can we update it to

`(put | put-if-absent .. | get)-latency (avg | max)`

(3) typo: `When users override it to "2.2" or below,` this should be
"2.3" -- or maybe even different if Bruno's concern gets addressed.




-Matthias





On 9/4/19 12:26 PM, Bruno Cadonna wrote:
> Hi,
> 
> I am sorry to restart the discussion here, but I came across a small
> issue in the KIP.
> 
> I started to implement KIP-444 and I am bit concerned about the values
> for the the config `built.in.metrics.version`. In the KIP the possible
> values are specified as all Kafka Streams versions. I think that this
> set of values is really hard to maintain in the code and it also blows
> up the testing burden unnecessarily because all versions need to be
> tested. My proposal (backed by John) is to use the following values:
> - `latest` for the latest version of the metrics
> - `0.10.0-2.3` for the version before `latest`
> If in future, let's say in version 4.1, we need again to change the
> metrics, we would add `2.4-4.0` to the values of the config. With
> major versions, we could also get rid of some values.
> 
> WDYT?
> 
> You can also have a look at the PR
> https://github.com/apache/kafka/pull/7279 to see this in code.
> 
> Best,
> Bruno
> 
> On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <wa...@gmail.com> wrote:
>>
>> Hello Bruno,
>>
>> I've updated the wiki page again per your comments, here's a brief summary:
>>
>> 1. added the list of removed metrics.
>> 2. added a task-level INFO metric "dropped-records" that covers all
>> scenarios and merges in the existing "late-records-drop",
>> "skipped-records", and "expired-window-records-drop".
>> 3. renamed the util functions of StreamsMetrics as `addLatencyRateTotal`
>> and `addRateTotal` sensors.
>>
>>
>> Since I feel it has incorporated all of your comments I'm going to start
>> the vote thread for this KIP now.
>>
>>
>> Guozhang
>>
>>
>> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <wa...@gmail.com> wrote:
>>
>>> Hi Bruno,
>>>
>>> No it was not intentional, and we can definitely add the total amount
>>> sensor as well -- they are just util functions to save users some lines of
>>> code anyways, and should be straightforward.
>>>
>>> Guozhang
>>>
>>>
>>> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io> wrote:
>>>
>>>> Hi Guozhang,
>>>>
>>>> I totally missed the total invocation count metric in the javadoc.
>>>> Which brings me to a follow-up question. Should the names of the
>>>> methods reflect the included total invocation count? We have to rename
>>>> them anyways. One option would be to simply add `Total` to the method
>>>> names, i.e., `addLatencyAndRateAndTotalSensor` and
>>>> `addRateAndTotalSensor` (alternatively without the `And`s). Since
>>>> those sensors record exclusively invocations, another option would be
>>>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
>>>>
>>>> As far as I can see, we have sensors to record invocations but none to
>>>> record amounts. Is that intentional? No need to add it to this KIP, I
>>>> am just curious.
>>>>
>>>> Best,
>>>> Bruno
>>>>
>>>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wa...@gmail.com> wrote:
>>>>>
>>>>> Hi Bruno,
>>>>>
>>>>> Just realized that for `addRateSensor` and `addLatencyAndRateSensor`
>>>> we've
>>>>> actually added the total invocation metric already.
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wa...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Hi Bruno,
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io>
>>>> wrote:
>>>>>>
>>>>>>> Hi Guozhang,
>>>>>>>
>>>>>>> I left my comments inline.
>>>>>>>
>>>>>>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wa...@gmail.com>
>>>> wrote:
>>>>>>>>
>>>>>>>> Hello Bruno,
>>>>>>>>
>>>>>>>> Thanks for the feedbacks, replied inline.
>>>>>>>>
>>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Guozhang,
>>>>>>>>>
>>>>>>>>> Thank you for the KIP.
>>>>>>>>>
>>>>>>>>> 1) As far as I understand, the StreamsMetrics interface is there
>>>> for
>>>>>>>>> user-defined processors. Would it make sense to also add a
>>>> method to
>>>>>>>>> the interface to specify a sensor that records skipped records?
>>>>>>>>>
>>>>>>>>> Not sure I follow.. if users want to add a specific skipped
>>>> records
>>>>>>>> sensor, she can still do that as a "throughput" sensor via "
>>>>>>>> addThroughputSensor" and then "record" right?
>>>>>>>>
>>>>>>>> As an after-thought, maybe it's better to rename `throughput` to
>>>> `rate`
>>>>>>> in
>>>>>>>> the public APIs since it is really meant for the latter semantics.
>>>> I did
>>>>>>>> not change it just to make less API changes / deprecate fewer
>>>> functions.
>>>>>>>> But if we feel it is important we can change it as well.
>>>>>>>>
>>>>>>>
>>>>>>> I see now that a user can record the rate of skipped records.
>>>> However,
>>>>>>> I was referring to the total number of skipped records. Maybe my
>>>>>>> question should be more general: should we allow the user to also
>>>>>>> specify sensors for totals or combinations of rate and totals?
>>>>>>>
>>>>>>> Sounds good to me, I will add it to the wiki page as well for
>>>>>> StreamsMetrics.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> Regarding the naming, I like `rate` more than `throughput`, but I
>>>>>>> would not fight for it.
>>>>>>>
>>>>>>>>
>>>>>>>>> 2) What are the semantics of active-task-process and
>>>>>>> standby-task-process
>>>>>>>>>
>>>>>>>>> Ah good catch, I think I made it in the wrong column. Just some
>>>>>>>> explanations here: Within a thread's looped iterations, it will
>>>> first
>>>>>>> try
>>>>>>>> to process some records from the active tasks, and then see if
>>>> there are
>>>>>>>> any standby-tasks that can be processed as well (i.e. just reading
>>>> from
>>>>>>> the
>>>>>>>> restore consumer and apply to the local stores). The ratio metrics
>>>> are
>>>>>>> for
>>>>>>>> indicating 1) what tasks (active or standby) does this thread own
>>>> so
>>>>>>> far,
>>>>>>>> and 2) how much time in percentage does it spend on each of them.
>>>>>>>>
>>>>>>>> But this metric should really be a task-level one that includes
>>>> both the
>>>>>>>> thread-id and task-id, and upon task migrations they will be
>>>> dynamically
>>>>>>>> deleted / (re)-created. For each task-id it may be owned by
>>>> multiple
>>>>>>>> threads as one active and others standby, and hence the separation
>>>> of
>>>>>>>> active / standby seems still necessary.
>>>>>>>>
>>>>>>>
>>>>>>> Makes sense.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> 3) How do dropped-late-records and expired-window-record-drop
>>>> relate
>>>>>>>>> to each other? I guess the former is for records that fall
>>>> outside the
>>>>>>>>> grace period and the latter is for records that are processed
>>>> after
>>>>>>>>> the retention period of the window. Is this correct?
>>>>>>>>>
>>>>>>>>> Yes, that's correct. The names are indeed a bit confusing since
>>>> they
>>>>>>> are
>>>>>>>> added at different releases historically..
>>>>>>>>
>>>>>>>> More precisely, the `grace period` is a notion of the operator
>>>> (hence
>>>>>>> the
>>>>>>>> metric is node-level, though it would only be used for DSL
>>>> operators)
>>>>>>> while
>>>>>>>> the `retention` is a notion of the store (hence the metric is
>>>>>>> store-level).
>>>>>>>> Usually grace period will be smaller than store retention though.
>>>>>>>>
>>>>>>>> Processor node is aware of `grace period` and when received a
>>>> record
>>>>>>> that
>>>>>>>> is older than grace deadline, it will be dropped immediately;
>>>> otherwise
>>>>>>> it
>>>>>>>> will still be processed a maybe a new update is "put" into the
>>>> store.
>>>>>>> The
>>>>>>>> store is aware of its `retention period` and then upon a "put"
>>>> call if
>>>>>>> it
>>>>>>>> realized it is older than the retention deadline, that put call
>>>> would be
>>>>>>>> ignored and metric is recorded.
>>>>>>>>
>>>>>>>> We have to separate them here since the window store can be used
>>>> in both
>>>>>>>> DSL and PAPI, and for the former case it would likely to be already
>>>>>>> ignored
>>>>>>>> at the processor node level due to the grace period which is
>>>> usually
>>>>>>>> smaller than retention; but for PAPI there's no grace period and
>>>> hence
>>>>>>> the
>>>>>>>> processor would likely still process and call "put" on the store.
>>>>>>>>
>>>>>>>
>>>>>>> Alright! Got it!
>>>>>>>
>>>>>>>>
>>>>>>>>> 4) Is there an actual difference between skipped and dropped
>>>> records?
>>>>>>>>> If not, shall we unify the terminology?
>>>>>>>>>
>>>>>>>>>
>>>>>>>> There is. Dropped records are only due to lateness; where as
>>>> skipped
>>>>>>>> records can be due to serde errors (and user's error handling
>>>> indicate
>>>>>>>> "skip and continue"), timestamp errors, etc.
>>>>>>>>
>>>>>>>> I've considered maybe a better (more extensible) way would be
>>>> defining a
>>>>>>>> single metric name, say skipped-records, but use different tags to
>>>>>>> indicate
>>>>>>>> if its skipping reason (errors, windowing semantics, etc). But
>>>> there's
>>>>>>>> still a tricky difference: for serde caused skipping for example,
>>>> they
>>>>>>> will
>>>>>>>> be skipped at the very beginning and there's no effects taken at
>>>> all.
>>>>>>> For
>>>>>>>> some others e.g. null-key / value at the reduce operator, it is
>>>> only
>>>>>>>> skipped at the middle of the processing, i.e. some effects may have
>>>>>>> already
>>>>>>>> been taken in up-stream sub-topologies. And that's why for
>>>>>>> skipped-records
>>>>>>>> I've defined it on both task-level and node-level and the
>>>> aggregate of
>>>>>>> the
>>>>>>>> latter may still be smaller than the former, whereas for
>>>>>>> dropped-records it
>>>>>>>> is only for node-level.
>>>>>>>>
>>>>>>>> So how about an even more significant change then: we enlarge the
>>>>>>>> `dropped-late-records` to `dropped-records` which is node-level
>>>> only,
>>>>>>> but
>>>>>>>> includes reasons form lateness to semantics (like null-key) as
>>>> well; and
>>>>>>>> then we have a task-level-only `skipped-records` which only record
>>>> those
>>>>>>>> dropped at the very beginning and did not make it at all to the
>>>>>>> processing
>>>>>>>> topology. I feel this is a clearer distinguishment but also a
>>>> bigger
>>>>>>> change
>>>>>>>> to users.
>>>>>>>>
>>>>>>>
>>>>>>> I like the way you dropped-records and skipped-records are now
>>>>>>> defined. My follow-up question is whether we should give names to
>>>>>>> those metrics that better describe their semantics, like:
>>>>>>>
>>>>>>> dropped-records-at-source and dropped-records-at-processor
>>>>>>>
>>>>>>> or
>>>>>>>
>>>>>>> records-dropped-at-source and records-dropped-at-processor
>>>>>>>
>>>>>>> or
>>>>>>>
>>>>>>> source-dropped-records and processor-dropped-records
>>>>>>>
>>>>>>> or alternatively with skipped. However, I would use the same term as
>>>>>>> in expired-window-record-drop
>>>>>>>
>>>>>>> Maybe, we should also consider to rename expired-window-record-drop
>>>> to
>>>>>>> expired-window-record-dropped to be consistent.
>>>>>>>
>>>>>>> WDYT?
>>>>>>>
>>>>>>> I was not considering "expired-window-record-drop" before since it
>>>> is a
>>>>>> store-level metric, and I was only considering task-level
>>>> (skipped-records)
>>>>>> and processor-node-level (dropped-records) metrics, and I'm using
>>>> different
>>>>>> terms deliberately to hint users that they are different leveled
>>>> metrics.
>>>>>>
>>>>>> I still feel that using `skip` for task-level metrics indicating that
>>>> this
>>>>>> record was not processed at all, and using `drop` for processor-level
>>>>>> metrics that this record is only dropped at this stage of the
>>>> topology is a
>>>>>> better one; but I'm also okay with some finer grained metrics so that
>>>> we
>>>>>> can align the processor-level with store-level (they are on the same
>>>>>> granularity any ways), like:
>>>>>>
>>>>>> `dropped-records-null-field`: at processor nodes
>>>>>>
>>>>>> `dropped-records-too-late`: at processor nodes
>>>>>>
>>>>>> `dropped-records-expired-window`: at window-stores
>>>>>>
>>>>>>
>>>>>>>>
>>>>>>>>> 5) What happens with removed metrics when the user sets the
>>>> version of
>>>>>>>>> "built.in.metrics.version" to 2.2-
>>>>>>>>>
>>>>>>>>> I think for those redundant ones like ""forward-rate" and
>>>>>>> "destroy-rate"
>>>>>>>> we can still remove them with 2.2- as well; for other ones that are
>>>>>>> removed
>>>>>>>> / replaced like thread-level skipped-records we should still
>>>> maintain
>>>>>>> them.
>>>>>>>>
>>>>>>>
>>>>>>> Could you add this comment about removal of redundant metrics to the
>>>>>>> KIP such that is documented somewhere?
>>>>>>>
>>>>>>> Yes, for sure.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Bruno
>>>>>>>
>>>>>>
>>>>>> I've also decided to remove the rebalance-related metrics from the
>>>>>> instance-level and move it to consumer itself as part of KIP-429.
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>> --
>> -- Guozhang


Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi,

I am sorry to restart the discussion here, but I came across a small
issue in the KIP.

I started to implement KIP-444 and I am bit concerned about the values
for the the config `built.in.metrics.version`. In the KIP the possible
values are specified as all Kafka Streams versions. I think that this
set of values is really hard to maintain in the code and it also blows
up the testing burden unnecessarily because all versions need to be
tested. My proposal (backed by John) is to use the following values:
- `latest` for the latest version of the metrics
- `0.10.0-2.3` for the version before `latest`
If in future, let's say in version 4.1, we need again to change the
metrics, we would add `2.4-4.0` to the values of the config. With
major versions, we could also get rid of some values.

WDYT?

You can also have a look at the PR
https://github.com/apache/kafka/pull/7279 to see this in code.

Best,
Bruno

On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> Hello Bruno,
>
> I've updated the wiki page again per your comments, here's a brief summary:
>
> 1. added the list of removed metrics.
> 2. added a task-level INFO metric "dropped-records" that covers all
> scenarios and merges in the existing "late-records-drop",
> "skipped-records", and "expired-window-records-drop".
> 3. renamed the util functions of StreamsMetrics as `addLatencyRateTotal`
> and `addRateTotal` sensors.
>
>
> Since I feel it has incorporated all of your comments I'm going to start
> the vote thread for this KIP now.
>
>
> Guozhang
>
>
> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Bruno,
> >
> > No it was not intentional, and we can definitely add the total amount
> > sensor as well -- they are just util functions to save users some lines of
> > code anyways, and should be straightforward.
> >
> > Guozhang
> >
> >
> > On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io> wrote:
> >
> >> Hi Guozhang,
> >>
> >> I totally missed the total invocation count metric in the javadoc.
> >> Which brings me to a follow-up question. Should the names of the
> >> methods reflect the included total invocation count? We have to rename
> >> them anyways. One option would be to simply add `Total` to the method
> >> names, i.e., `addLatencyAndRateAndTotalSensor` and
> >> `addRateAndTotalSensor` (alternatively without the `And`s). Since
> >> those sensors record exclusively invocations, another option would be
> >> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
> >>
> >> As far as I can see, we have sensors to record invocations but none to
> >> record amounts. Is that intentional? No need to add it to this KIP, I
> >> am just curious.
> >>
> >> Best,
> >> Bruno
> >>
> >> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wa...@gmail.com> wrote:
> >> >
> >> > Hi Bruno,
> >> >
> >> > Just realized that for `addRateSensor` and `addLatencyAndRateSensor`
> >> we've
> >> > actually added the total invocation metric already.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> > On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >
> >> > > Hi Bruno,
> >> > >
> >> > >
> >> > > On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io>
> >> wrote:
> >> > >
> >> > >> Hi Guozhang,
> >> > >>
> >> > >> I left my comments inline.
> >> > >>
> >> > >> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> > >> >
> >> > >> > Hello Bruno,
> >> > >> >
> >> > >> > Thanks for the feedbacks, replied inline.
> >> > >> >
> >> > >> > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
> >> > >> wrote:
> >> > >> >
> >> > >> > > Hi Guozhang,
> >> > >> > >
> >> > >> > > Thank you for the KIP.
> >> > >> > >
> >> > >> > > 1) As far as I understand, the StreamsMetrics interface is there
> >> for
> >> > >> > > user-defined processors. Would it make sense to also add a
> >> method to
> >> > >> > > the interface to specify a sensor that records skipped records?
> >> > >> > >
> >> > >> > > Not sure I follow.. if users want to add a specific skipped
> >> records
> >> > >> > sensor, she can still do that as a "throughput" sensor via "
> >> > >> > addThroughputSensor" and then "record" right?
> >> > >> >
> >> > >> > As an after-thought, maybe it's better to rename `throughput` to
> >> `rate`
> >> > >> in
> >> > >> > the public APIs since it is really meant for the latter semantics.
> >> I did
> >> > >> > not change it just to make less API changes / deprecate fewer
> >> functions.
> >> > >> > But if we feel it is important we can change it as well.
> >> > >> >
> >> > >>
> >> > >> I see now that a user can record the rate of skipped records.
> >> However,
> >> > >> I was referring to the total number of skipped records. Maybe my
> >> > >> question should be more general: should we allow the user to also
> >> > >> specify sensors for totals or combinations of rate and totals?
> >> > >>
> >> > >> Sounds good to me, I will add it to the wiki page as well for
> >> > > StreamsMetrics.
> >> > >
> >> > >
> >> > >
> >> > >> Regarding the naming, I like `rate` more than `throughput`, but I
> >> > >> would not fight for it.
> >> > >>
> >> > >> >
> >> > >> > > 2) What are the semantics of active-task-process and
> >> > >> standby-task-process
> >> > >> > >
> >> > >> > > Ah good catch, I think I made it in the wrong column. Just some
> >> > >> > explanations here: Within a thread's looped iterations, it will
> >> first
> >> > >> try
> >> > >> > to process some records from the active tasks, and then see if
> >> there are
> >> > >> > any standby-tasks that can be processed as well (i.e. just reading
> >> from
> >> > >> the
> >> > >> > restore consumer and apply to the local stores). The ratio metrics
> >> are
> >> > >> for
> >> > >> > indicating 1) what tasks (active or standby) does this thread own
> >> so
> >> > >> far,
> >> > >> > and 2) how much time in percentage does it spend on each of them.
> >> > >> >
> >> > >> > But this metric should really be a task-level one that includes
> >> both the
> >> > >> > thread-id and task-id, and upon task migrations they will be
> >> dynamically
> >> > >> > deleted / (re)-created. For each task-id it may be owned by
> >> multiple
> >> > >> > threads as one active and others standby, and hence the separation
> >> of
> >> > >> > active / standby seems still necessary.
> >> > >> >
> >> > >>
> >> > >> Makes sense.
> >> > >>
> >> > >>
> >> > >> >
> >> > >> >
> >> > >> > > 3) How do dropped-late-records and expired-window-record-drop
> >> relate
> >> > >> > > to each other? I guess the former is for records that fall
> >> outside the
> >> > >> > > grace period and the latter is for records that are processed
> >> after
> >> > >> > > the retention period of the window. Is this correct?
> >> > >> > >
> >> > >> > > Yes, that's correct. The names are indeed a bit confusing since
> >> they
> >> > >> are
> >> > >> > added at different releases historically..
> >> > >> >
> >> > >> > More precisely, the `grace period` is a notion of the operator
> >> (hence
> >> > >> the
> >> > >> > metric is node-level, though it would only be used for DSL
> >> operators)
> >> > >> while
> >> > >> > the `retention` is a notion of the store (hence the metric is
> >> > >> store-level).
> >> > >> > Usually grace period will be smaller than store retention though.
> >> > >> >
> >> > >> > Processor node is aware of `grace period` and when received a
> >> record
> >> > >> that
> >> > >> > is older than grace deadline, it will be dropped immediately;
> >> otherwise
> >> > >> it
> >> > >> > will still be processed a maybe a new update is "put" into the
> >> store.
> >> > >> The
> >> > >> > store is aware of its `retention period` and then upon a "put"
> >> call if
> >> > >> it
> >> > >> > realized it is older than the retention deadline, that put call
> >> would be
> >> > >> > ignored and metric is recorded.
> >> > >> >
> >> > >> > We have to separate them here since the window store can be used
> >> in both
> >> > >> > DSL and PAPI, and for the former case it would likely to be already
> >> > >> ignored
> >> > >> > at the processor node level due to the grace period which is
> >> usually
> >> > >> > smaller than retention; but for PAPI there's no grace period and
> >> hence
> >> > >> the
> >> > >> > processor would likely still process and call "put" on the store.
> >> > >> >
> >> > >>
> >> > >> Alright! Got it!
> >> > >>
> >> > >> >
> >> > >> > > 4) Is there an actual difference between skipped and dropped
> >> records?
> >> > >> > > If not, shall we unify the terminology?
> >> > >> > >
> >> > >> > >
> >> > >> > There is. Dropped records are only due to lateness; where as
> >> skipped
> >> > >> > records can be due to serde errors (and user's error handling
> >> indicate
> >> > >> > "skip and continue"), timestamp errors, etc.
> >> > >> >
> >> > >> > I've considered maybe a better (more extensible) way would be
> >> defining a
> >> > >> > single metric name, say skipped-records, but use different tags to
> >> > >> indicate
> >> > >> > if its skipping reason (errors, windowing semantics, etc). But
> >> there's
> >> > >> > still a tricky difference: for serde caused skipping for example,
> >> they
> >> > >> will
> >> > >> > be skipped at the very beginning and there's no effects taken at
> >> all.
> >> > >> For
> >> > >> > some others e.g. null-key / value at the reduce operator, it is
> >> only
> >> > >> > skipped at the middle of the processing, i.e. some effects may have
> >> > >> already
> >> > >> > been taken in up-stream sub-topologies. And that's why for
> >> > >> skipped-records
> >> > >> > I've defined it on both task-level and node-level and the
> >> aggregate of
> >> > >> the
> >> > >> > latter may still be smaller than the former, whereas for
> >> > >> dropped-records it
> >> > >> > is only for node-level.
> >> > >> >
> >> > >> > So how about an even more significant change then: we enlarge the
> >> > >> > `dropped-late-records` to `dropped-records` which is node-level
> >> only,
> >> > >> but
> >> > >> > includes reasons form lateness to semantics (like null-key) as
> >> well; and
> >> > >> > then we have a task-level-only `skipped-records` which only record
> >> those
> >> > >> > dropped at the very beginning and did not make it at all to the
> >> > >> processing
> >> > >> > topology. I feel this is a clearer distinguishment but also a
> >> bigger
> >> > >> change
> >> > >> > to users.
> >> > >> >
> >> > >>
> >> > >> I like the way you dropped-records and skipped-records are now
> >> > >> defined. My follow-up question is whether we should give names to
> >> > >> those metrics that better describe their semantics, like:
> >> > >>
> >> > >> dropped-records-at-source and dropped-records-at-processor
> >> > >>
> >> > >> or
> >> > >>
> >> > >> records-dropped-at-source and records-dropped-at-processor
> >> > >>
> >> > >> or
> >> > >>
> >> > >> source-dropped-records and processor-dropped-records
> >> > >>
> >> > >> or alternatively with skipped. However, I would use the same term as
> >> > >> in expired-window-record-drop
> >> > >>
> >> > >> Maybe, we should also consider to rename expired-window-record-drop
> >> to
> >> > >> expired-window-record-dropped to be consistent.
> >> > >>
> >> > >> WDYT?
> >> > >>
> >> > >> I was not considering "expired-window-record-drop" before since it
> >> is a
> >> > > store-level metric, and I was only considering task-level
> >> (skipped-records)
> >> > > and processor-node-level (dropped-records) metrics, and I'm using
> >> different
> >> > > terms deliberately to hint users that they are different leveled
> >> metrics.
> >> > >
> >> > > I still feel that using `skip` for task-level metrics indicating that
> >> this
> >> > > record was not processed at all, and using `drop` for processor-level
> >> > > metrics that this record is only dropped at this stage of the
> >> topology is a
> >> > > better one; but I'm also okay with some finer grained metrics so that
> >> we
> >> > > can align the processor-level with store-level (they are on the same
> >> > > granularity any ways), like:
> >> > >
> >> > > `dropped-records-null-field`: at processor nodes
> >> > >
> >> > > `dropped-records-too-late`: at processor nodes
> >> > >
> >> > > `dropped-records-expired-window`: at window-stores
> >> > >
> >> > >
> >> > >> >
> >> > >> > > 5) What happens with removed metrics when the user sets the
> >> version of
> >> > >> > > "built.in.metrics.version" to 2.2-
> >> > >> > >
> >> > >> > > I think for those redundant ones like ""forward-rate" and
> >> > >> "destroy-rate"
> >> > >> > we can still remove them with 2.2- as well; for other ones that are
> >> > >> removed
> >> > >> > / replaced like thread-level skipped-records we should still
> >> maintain
> >> > >> them.
> >> > >> >
> >> > >>
> >> > >> Could you add this comment about removal of redundant metrics to the
> >> > >> KIP such that is documented somewhere?
> >> > >>
> >> > >> Yes, for sure.
> >> > >
> >> > >
> >> > >>
> >> > >> Best,
> >> > >> Bruno
> >> > >>
> >> > >
> >> > > I've also decided to remove the rebalance-related metrics from the
> >> > > instance-level and move it to consumer itself as part of KIP-429.
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
> -- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Bruno,

I've updated the wiki page again per your comments, here's a brief summary:

1. added the list of removed metrics.
2. added a task-level INFO metric "dropped-records" that covers all
scenarios and merges in the existing "late-records-drop",
"skipped-records", and "expired-window-records-drop".
3. renamed the util functions of StreamsMetrics as `addLatencyRateTotal`
and `addRateTotal` sensors.


Since I feel it has incorporated all of your comments I'm going to start
the vote thread for this KIP now.


Guozhang


On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Bruno,
>
> No it was not intentional, and we can definitely add the total amount
> sensor as well -- they are just util functions to save users some lines of
> code anyways, and should be straightforward.
>
> Guozhang
>
>
> On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io> wrote:
>
>> Hi Guozhang,
>>
>> I totally missed the total invocation count metric in the javadoc.
>> Which brings me to a follow-up question. Should the names of the
>> methods reflect the included total invocation count? We have to rename
>> them anyways. One option would be to simply add `Total` to the method
>> names, i.e., `addLatencyAndRateAndTotalSensor` and
>> `addRateAndTotalSensor` (alternatively without the `And`s). Since
>> those sensors record exclusively invocations, another option would be
>> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
>>
>> As far as I can see, we have sensors to record invocations but none to
>> record amounts. Is that intentional? No need to add it to this KIP, I
>> am just curious.
>>
>> Best,
>> Bruno
>>
>> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wa...@gmail.com> wrote:
>> >
>> > Hi Bruno,
>> >
>> > Just realized that for `addRateSensor` and `addLatencyAndRateSensor`
>> we've
>> > actually added the total invocation metric already.
>> >
>> >
>> > Guozhang
>> >
>> > On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > Hi Bruno,
>> > >
>> > >
>> > > On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io>
>> wrote:
>> > >
>> > >> Hi Guozhang,
>> > >>
>> > >> I left my comments inline.
>> > >>
>> > >> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wa...@gmail.com>
>> wrote:
>> > >> >
>> > >> > Hello Bruno,
>> > >> >
>> > >> > Thanks for the feedbacks, replied inline.
>> > >> >
>> > >> > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
>> > >> wrote:
>> > >> >
>> > >> > > Hi Guozhang,
>> > >> > >
>> > >> > > Thank you for the KIP.
>> > >> > >
>> > >> > > 1) As far as I understand, the StreamsMetrics interface is there
>> for
>> > >> > > user-defined processors. Would it make sense to also add a
>> method to
>> > >> > > the interface to specify a sensor that records skipped records?
>> > >> > >
>> > >> > > Not sure I follow.. if users want to add a specific skipped
>> records
>> > >> > sensor, she can still do that as a "throughput" sensor via "
>> > >> > addThroughputSensor" and then "record" right?
>> > >> >
>> > >> > As an after-thought, maybe it's better to rename `throughput` to
>> `rate`
>> > >> in
>> > >> > the public APIs since it is really meant for the latter semantics.
>> I did
>> > >> > not change it just to make less API changes / deprecate fewer
>> functions.
>> > >> > But if we feel it is important we can change it as well.
>> > >> >
>> > >>
>> > >> I see now that a user can record the rate of skipped records.
>> However,
>> > >> I was referring to the total number of skipped records. Maybe my
>> > >> question should be more general: should we allow the user to also
>> > >> specify sensors for totals or combinations of rate and totals?
>> > >>
>> > >> Sounds good to me, I will add it to the wiki page as well for
>> > > StreamsMetrics.
>> > >
>> > >
>> > >
>> > >> Regarding the naming, I like `rate` more than `throughput`, but I
>> > >> would not fight for it.
>> > >>
>> > >> >
>> > >> > > 2) What are the semantics of active-task-process and
>> > >> standby-task-process
>> > >> > >
>> > >> > > Ah good catch, I think I made it in the wrong column. Just some
>> > >> > explanations here: Within a thread's looped iterations, it will
>> first
>> > >> try
>> > >> > to process some records from the active tasks, and then see if
>> there are
>> > >> > any standby-tasks that can be processed as well (i.e. just reading
>> from
>> > >> the
>> > >> > restore consumer and apply to the local stores). The ratio metrics
>> are
>> > >> for
>> > >> > indicating 1) what tasks (active or standby) does this thread own
>> so
>> > >> far,
>> > >> > and 2) how much time in percentage does it spend on each of them.
>> > >> >
>> > >> > But this metric should really be a task-level one that includes
>> both the
>> > >> > thread-id and task-id, and upon task migrations they will be
>> dynamically
>> > >> > deleted / (re)-created. For each task-id it may be owned by
>> multiple
>> > >> > threads as one active and others standby, and hence the separation
>> of
>> > >> > active / standby seems still necessary.
>> > >> >
>> > >>
>> > >> Makes sense.
>> > >>
>> > >>
>> > >> >
>> > >> >
>> > >> > > 3) How do dropped-late-records and expired-window-record-drop
>> relate
>> > >> > > to each other? I guess the former is for records that fall
>> outside the
>> > >> > > grace period and the latter is for records that are processed
>> after
>> > >> > > the retention period of the window. Is this correct?
>> > >> > >
>> > >> > > Yes, that's correct. The names are indeed a bit confusing since
>> they
>> > >> are
>> > >> > added at different releases historically..
>> > >> >
>> > >> > More precisely, the `grace period` is a notion of the operator
>> (hence
>> > >> the
>> > >> > metric is node-level, though it would only be used for DSL
>> operators)
>> > >> while
>> > >> > the `retention` is a notion of the store (hence the metric is
>> > >> store-level).
>> > >> > Usually grace period will be smaller than store retention though.
>> > >> >
>> > >> > Processor node is aware of `grace period` and when received a
>> record
>> > >> that
>> > >> > is older than grace deadline, it will be dropped immediately;
>> otherwise
>> > >> it
>> > >> > will still be processed a maybe a new update is "put" into the
>> store.
>> > >> The
>> > >> > store is aware of its `retention period` and then upon a "put"
>> call if
>> > >> it
>> > >> > realized it is older than the retention deadline, that put call
>> would be
>> > >> > ignored and metric is recorded.
>> > >> >
>> > >> > We have to separate them here since the window store can be used
>> in both
>> > >> > DSL and PAPI, and for the former case it would likely to be already
>> > >> ignored
>> > >> > at the processor node level due to the grace period which is
>> usually
>> > >> > smaller than retention; but for PAPI there's no grace period and
>> hence
>> > >> the
>> > >> > processor would likely still process and call "put" on the store.
>> > >> >
>> > >>
>> > >> Alright! Got it!
>> > >>
>> > >> >
>> > >> > > 4) Is there an actual difference between skipped and dropped
>> records?
>> > >> > > If not, shall we unify the terminology?
>> > >> > >
>> > >> > >
>> > >> > There is. Dropped records are only due to lateness; where as
>> skipped
>> > >> > records can be due to serde errors (and user's error handling
>> indicate
>> > >> > "skip and continue"), timestamp errors, etc.
>> > >> >
>> > >> > I've considered maybe a better (more extensible) way would be
>> defining a
>> > >> > single metric name, say skipped-records, but use different tags to
>> > >> indicate
>> > >> > if its skipping reason (errors, windowing semantics, etc). But
>> there's
>> > >> > still a tricky difference: for serde caused skipping for example,
>> they
>> > >> will
>> > >> > be skipped at the very beginning and there's no effects taken at
>> all.
>> > >> For
>> > >> > some others e.g. null-key / value at the reduce operator, it is
>> only
>> > >> > skipped at the middle of the processing, i.e. some effects may have
>> > >> already
>> > >> > been taken in up-stream sub-topologies. And that's why for
>> > >> skipped-records
>> > >> > I've defined it on both task-level and node-level and the
>> aggregate of
>> > >> the
>> > >> > latter may still be smaller than the former, whereas for
>> > >> dropped-records it
>> > >> > is only for node-level.
>> > >> >
>> > >> > So how about an even more significant change then: we enlarge the
>> > >> > `dropped-late-records` to `dropped-records` which is node-level
>> only,
>> > >> but
>> > >> > includes reasons form lateness to semantics (like null-key) as
>> well; and
>> > >> > then we have a task-level-only `skipped-records` which only record
>> those
>> > >> > dropped at the very beginning and did not make it at all to the
>> > >> processing
>> > >> > topology. I feel this is a clearer distinguishment but also a
>> bigger
>> > >> change
>> > >> > to users.
>> > >> >
>> > >>
>> > >> I like the way you dropped-records and skipped-records are now
>> > >> defined. My follow-up question is whether we should give names to
>> > >> those metrics that better describe their semantics, like:
>> > >>
>> > >> dropped-records-at-source and dropped-records-at-processor
>> > >>
>> > >> or
>> > >>
>> > >> records-dropped-at-source and records-dropped-at-processor
>> > >>
>> > >> or
>> > >>
>> > >> source-dropped-records and processor-dropped-records
>> > >>
>> > >> or alternatively with skipped. However, I would use the same term as
>> > >> in expired-window-record-drop
>> > >>
>> > >> Maybe, we should also consider to rename expired-window-record-drop
>> to
>> > >> expired-window-record-dropped to be consistent.
>> > >>
>> > >> WDYT?
>> > >>
>> > >> I was not considering "expired-window-record-drop" before since it
>> is a
>> > > store-level metric, and I was only considering task-level
>> (skipped-records)
>> > > and processor-node-level (dropped-records) metrics, and I'm using
>> different
>> > > terms deliberately to hint users that they are different leveled
>> metrics.
>> > >
>> > > I still feel that using `skip` for task-level metrics indicating that
>> this
>> > > record was not processed at all, and using `drop` for processor-level
>> > > metrics that this record is only dropped at this stage of the
>> topology is a
>> > > better one; but I'm also okay with some finer grained metrics so that
>> we
>> > > can align the processor-level with store-level (they are on the same
>> > > granularity any ways), like:
>> > >
>> > > `dropped-records-null-field`: at processor nodes
>> > >
>> > > `dropped-records-too-late`: at processor nodes
>> > >
>> > > `dropped-records-expired-window`: at window-stores
>> > >
>> > >
>> > >> >
>> > >> > > 5) What happens with removed metrics when the user sets the
>> version of
>> > >> > > "built.in.metrics.version" to 2.2-
>> > >> > >
>> > >> > > I think for those redundant ones like ""forward-rate" and
>> > >> "destroy-rate"
>> > >> > we can still remove them with 2.2- as well; for other ones that are
>> > >> removed
>> > >> > / replaced like thread-level skipped-records we should still
>> maintain
>> > >> them.
>> > >> >
>> > >>
>> > >> Could you add this comment about removal of redundant metrics to the
>> > >> KIP such that is documented somewhere?
>> > >>
>> > >> Yes, for sure.
>> > >
>> > >
>> > >>
>> > >> Best,
>> > >> Bruno
>> > >>
>> > >
>> > > I've also decided to remove the rebalance-related metrics from the
>> > > instance-level and move it to consumer itself as part of KIP-429.
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Bruno,

No it was not intentional, and we can definitely add the total amount
sensor as well -- they are just util functions to save users some lines of
code anyways, and should be straightforward.

Guozhang


On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Guozhang,
>
> I totally missed the total invocation count metric in the javadoc.
> Which brings me to a follow-up question. Should the names of the
> methods reflect the included total invocation count? We have to rename
> them anyways. One option would be to simply add `Total` to the method
> names, i.e., `addLatencyAndRateAndTotalSensor` and
> `addRateAndTotalSensor` (alternatively without the `And`s). Since
> those sensors record exclusively invocations, another option would be
> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
>
> As far as I can see, we have sensors to record invocations but none to
> record amounts. Is that intentional? No need to add it to this KIP, I
> am just curious.
>
> Best,
> Bruno
>
> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > Hi Bruno,
> >
> > Just realized that for `addRateSensor` and `addLatencyAndRateSensor`
> we've
> > actually added the total invocation metric already.
> >
> >
> > Guozhang
> >
> > On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hi Bruno,
> > >
> > >
> > > On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io>
> wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> I left my comments inline.
> > >>
> > >> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> > >> >
> > >> > Hello Bruno,
> > >> >
> > >> > Thanks for the feedbacks, replied inline.
> > >> >
> > >> > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
> > >> wrote:
> > >> >
> > >> > > Hi Guozhang,
> > >> > >
> > >> > > Thank you for the KIP.
> > >> > >
> > >> > > 1) As far as I understand, the StreamsMetrics interface is there
> for
> > >> > > user-defined processors. Would it make sense to also add a method
> to
> > >> > > the interface to specify a sensor that records skipped records?
> > >> > >
> > >> > > Not sure I follow.. if users want to add a specific skipped
> records
> > >> > sensor, she can still do that as a "throughput" sensor via "
> > >> > addThroughputSensor" and then "record" right?
> > >> >
> > >> > As an after-thought, maybe it's better to rename `throughput` to
> `rate`
> > >> in
> > >> > the public APIs since it is really meant for the latter semantics.
> I did
> > >> > not change it just to make less API changes / deprecate fewer
> functions.
> > >> > But if we feel it is important we can change it as well.
> > >> >
> > >>
> > >> I see now that a user can record the rate of skipped records. However,
> > >> I was referring to the total number of skipped records. Maybe my
> > >> question should be more general: should we allow the user to also
> > >> specify sensors for totals or combinations of rate and totals?
> > >>
> > >> Sounds good to me, I will add it to the wiki page as well for
> > > StreamsMetrics.
> > >
> > >
> > >
> > >> Regarding the naming, I like `rate` more than `throughput`, but I
> > >> would not fight for it.
> > >>
> > >> >
> > >> > > 2) What are the semantics of active-task-process and
> > >> standby-task-process
> > >> > >
> > >> > > Ah good catch, I think I made it in the wrong column. Just some
> > >> > explanations here: Within a thread's looped iterations, it will
> first
> > >> try
> > >> > to process some records from the active tasks, and then see if
> there are
> > >> > any standby-tasks that can be processed as well (i.e. just reading
> from
> > >> the
> > >> > restore consumer and apply to the local stores). The ratio metrics
> are
> > >> for
> > >> > indicating 1) what tasks (active or standby) does this thread own so
> > >> far,
> > >> > and 2) how much time in percentage does it spend on each of them.
> > >> >
> > >> > But this metric should really be a task-level one that includes
> both the
> > >> > thread-id and task-id, and upon task migrations they will be
> dynamically
> > >> > deleted / (re)-created. For each task-id it may be owned by multiple
> > >> > threads as one active and others standby, and hence the separation
> of
> > >> > active / standby seems still necessary.
> > >> >
> > >>
> > >> Makes sense.
> > >>
> > >>
> > >> >
> > >> >
> > >> > > 3) How do dropped-late-records and expired-window-record-drop
> relate
> > >> > > to each other? I guess the former is for records that fall
> outside the
> > >> > > grace period and the latter is for records that are processed
> after
> > >> > > the retention period of the window. Is this correct?
> > >> > >
> > >> > > Yes, that's correct. The names are indeed a bit confusing since
> they
> > >> are
> > >> > added at different releases historically..
> > >> >
> > >> > More precisely, the `grace period` is a notion of the operator
> (hence
> > >> the
> > >> > metric is node-level, though it would only be used for DSL
> operators)
> > >> while
> > >> > the `retention` is a notion of the store (hence the metric is
> > >> store-level).
> > >> > Usually grace period will be smaller than store retention though.
> > >> >
> > >> > Processor node is aware of `grace period` and when received a record
> > >> that
> > >> > is older than grace deadline, it will be dropped immediately;
> otherwise
> > >> it
> > >> > will still be processed a maybe a new update is "put" into the
> store.
> > >> The
> > >> > store is aware of its `retention period` and then upon a "put" call
> if
> > >> it
> > >> > realized it is older than the retention deadline, that put call
> would be
> > >> > ignored and metric is recorded.
> > >> >
> > >> > We have to separate them here since the window store can be used in
> both
> > >> > DSL and PAPI, and for the former case it would likely to be already
> > >> ignored
> > >> > at the processor node level due to the grace period which is usually
> > >> > smaller than retention; but for PAPI there's no grace period and
> hence
> > >> the
> > >> > processor would likely still process and call "put" on the store.
> > >> >
> > >>
> > >> Alright! Got it!
> > >>
> > >> >
> > >> > > 4) Is there an actual difference between skipped and dropped
> records?
> > >> > > If not, shall we unify the terminology?
> > >> > >
> > >> > >
> > >> > There is. Dropped records are only due to lateness; where as skipped
> > >> > records can be due to serde errors (and user's error handling
> indicate
> > >> > "skip and continue"), timestamp errors, etc.
> > >> >
> > >> > I've considered maybe a better (more extensible) way would be
> defining a
> > >> > single metric name, say skipped-records, but use different tags to
> > >> indicate
> > >> > if its skipping reason (errors, windowing semantics, etc). But
> there's
> > >> > still a tricky difference: for serde caused skipping for example,
> they
> > >> will
> > >> > be skipped at the very beginning and there's no effects taken at
> all.
> > >> For
> > >> > some others e.g. null-key / value at the reduce operator, it is only
> > >> > skipped at the middle of the processing, i.e. some effects may have
> > >> already
> > >> > been taken in up-stream sub-topologies. And that's why for
> > >> skipped-records
> > >> > I've defined it on both task-level and node-level and the aggregate
> of
> > >> the
> > >> > latter may still be smaller than the former, whereas for
> > >> dropped-records it
> > >> > is only for node-level.
> > >> >
> > >> > So how about an even more significant change then: we enlarge the
> > >> > `dropped-late-records` to `dropped-records` which is node-level
> only,
> > >> but
> > >> > includes reasons form lateness to semantics (like null-key) as
> well; and
> > >> > then we have a task-level-only `skipped-records` which only record
> those
> > >> > dropped at the very beginning and did not make it at all to the
> > >> processing
> > >> > topology. I feel this is a clearer distinguishment but also a bigger
> > >> change
> > >> > to users.
> > >> >
> > >>
> > >> I like the way you dropped-records and skipped-records are now
> > >> defined. My follow-up question is whether we should give names to
> > >> those metrics that better describe their semantics, like:
> > >>
> > >> dropped-records-at-source and dropped-records-at-processor
> > >>
> > >> or
> > >>
> > >> records-dropped-at-source and records-dropped-at-processor
> > >>
> > >> or
> > >>
> > >> source-dropped-records and processor-dropped-records
> > >>
> > >> or alternatively with skipped. However, I would use the same term as
> > >> in expired-window-record-drop
> > >>
> > >> Maybe, we should also consider to rename expired-window-record-drop to
> > >> expired-window-record-dropped to be consistent.
> > >>
> > >> WDYT?
> > >>
> > >> I was not considering "expired-window-record-drop" before since it is
> a
> > > store-level metric, and I was only considering task-level
> (skipped-records)
> > > and processor-node-level (dropped-records) metrics, and I'm using
> different
> > > terms deliberately to hint users that they are different leveled
> metrics.
> > >
> > > I still feel that using `skip` for task-level metrics indicating that
> this
> > > record was not processed at all, and using `drop` for processor-level
> > > metrics that this record is only dropped at this stage of the topology
> is a
> > > better one; but I'm also okay with some finer grained metrics so that
> we
> > > can align the processor-level with store-level (they are on the same
> > > granularity any ways), like:
> > >
> > > `dropped-records-null-field`: at processor nodes
> > >
> > > `dropped-records-too-late`: at processor nodes
> > >
> > > `dropped-records-expired-window`: at window-stores
> > >
> > >
> > >> >
> > >> > > 5) What happens with removed metrics when the user sets the
> version of
> > >> > > "built.in.metrics.version" to 2.2-
> > >> > >
> > >> > > I think for those redundant ones like ""forward-rate" and
> > >> "destroy-rate"
> > >> > we can still remove them with 2.2- as well; for other ones that are
> > >> removed
> > >> > / replaced like thread-level skipped-records we should still
> maintain
> > >> them.
> > >> >
> > >>
> > >> Could you add this comment about removal of redundant metrics to the
> > >> KIP such that is documented somewhere?
> > >>
> > >> Yes, for sure.
> > >
> > >
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >
> > > I've also decided to remove the rebalance-related metrics from the
> > > instance-level and move it to consumer itself as part of KIP-429.
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Guozhang,

I totally missed the total invocation count metric in the javadoc.
Which brings me to a follow-up question. Should the names of the
methods reflect the included total invocation count? We have to rename
them anyways. One option would be to simply add `Total` to the method
names, i.e., `addLatencyAndRateAndTotalSensor` and
`addRateAndTotalSensor` (alternatively without the `And`s). Since
those sensors record exclusively invocations, another option would be
`addInvocationSensor` and `addInvocationSensorWithoutLatency`.

As far as I can see, we have sensors to record invocations but none to
record amounts. Is that intentional? No need to add it to this KIP, I
am just curious.

Best,
Bruno

On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> Hi Bruno,
>
> Just realized that for `addRateSensor` and `addLatencyAndRateSensor` we've
> actually added the total invocation metric already.
>
>
> Guozhang
>
> On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Bruno,
> >
> >
> > On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io> wrote:
> >
> >> Hi Guozhang,
> >>
> >> I left my comments inline.
> >>
> >> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wa...@gmail.com> wrote:
> >> >
> >> > Hello Bruno,
> >> >
> >> > Thanks for the feedbacks, replied inline.
> >> >
> >> > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
> >> wrote:
> >> >
> >> > > Hi Guozhang,
> >> > >
> >> > > Thank you for the KIP.
> >> > >
> >> > > 1) As far as I understand, the StreamsMetrics interface is there for
> >> > > user-defined processors. Would it make sense to also add a method to
> >> > > the interface to specify a sensor that records skipped records?
> >> > >
> >> > > Not sure I follow.. if users want to add a specific skipped records
> >> > sensor, she can still do that as a "throughput" sensor via "
> >> > addThroughputSensor" and then "record" right?
> >> >
> >> > As an after-thought, maybe it's better to rename `throughput` to `rate`
> >> in
> >> > the public APIs since it is really meant for the latter semantics. I did
> >> > not change it just to make less API changes / deprecate fewer functions.
> >> > But if we feel it is important we can change it as well.
> >> >
> >>
> >> I see now that a user can record the rate of skipped records. However,
> >> I was referring to the total number of skipped records. Maybe my
> >> question should be more general: should we allow the user to also
> >> specify sensors for totals or combinations of rate and totals?
> >>
> >> Sounds good to me, I will add it to the wiki page as well for
> > StreamsMetrics.
> >
> >
> >
> >> Regarding the naming, I like `rate` more than `throughput`, but I
> >> would not fight for it.
> >>
> >> >
> >> > > 2) What are the semantics of active-task-process and
> >> standby-task-process
> >> > >
> >> > > Ah good catch, I think I made it in the wrong column. Just some
> >> > explanations here: Within a thread's looped iterations, it will first
> >> try
> >> > to process some records from the active tasks, and then see if there are
> >> > any standby-tasks that can be processed as well (i.e. just reading from
> >> the
> >> > restore consumer and apply to the local stores). The ratio metrics are
> >> for
> >> > indicating 1) what tasks (active or standby) does this thread own so
> >> far,
> >> > and 2) how much time in percentage does it spend on each of them.
> >> >
> >> > But this metric should really be a task-level one that includes both the
> >> > thread-id and task-id, and upon task migrations they will be dynamically
> >> > deleted / (re)-created. For each task-id it may be owned by multiple
> >> > threads as one active and others standby, and hence the separation of
> >> > active / standby seems still necessary.
> >> >
> >>
> >> Makes sense.
> >>
> >>
> >> >
> >> >
> >> > > 3) How do dropped-late-records and expired-window-record-drop relate
> >> > > to each other? I guess the former is for records that fall outside the
> >> > > grace period and the latter is for records that are processed after
> >> > > the retention period of the window. Is this correct?
> >> > >
> >> > > Yes, that's correct. The names are indeed a bit confusing since they
> >> are
> >> > added at different releases historically..
> >> >
> >> > More precisely, the `grace period` is a notion of the operator (hence
> >> the
> >> > metric is node-level, though it would only be used for DSL operators)
> >> while
> >> > the `retention` is a notion of the store (hence the metric is
> >> store-level).
> >> > Usually grace period will be smaller than store retention though.
> >> >
> >> > Processor node is aware of `grace period` and when received a record
> >> that
> >> > is older than grace deadline, it will be dropped immediately; otherwise
> >> it
> >> > will still be processed a maybe a new update is "put" into the store.
> >> The
> >> > store is aware of its `retention period` and then upon a "put" call if
> >> it
> >> > realized it is older than the retention deadline, that put call would be
> >> > ignored and metric is recorded.
> >> >
> >> > We have to separate them here since the window store can be used in both
> >> > DSL and PAPI, and for the former case it would likely to be already
> >> ignored
> >> > at the processor node level due to the grace period which is usually
> >> > smaller than retention; but for PAPI there's no grace period and hence
> >> the
> >> > processor would likely still process and call "put" on the store.
> >> >
> >>
> >> Alright! Got it!
> >>
> >> >
> >> > > 4) Is there an actual difference between skipped and dropped records?
> >> > > If not, shall we unify the terminology?
> >> > >
> >> > >
> >> > There is. Dropped records are only due to lateness; where as skipped
> >> > records can be due to serde errors (and user's error handling indicate
> >> > "skip and continue"), timestamp errors, etc.
> >> >
> >> > I've considered maybe a better (more extensible) way would be defining a
> >> > single metric name, say skipped-records, but use different tags to
> >> indicate
> >> > if its skipping reason (errors, windowing semantics, etc). But there's
> >> > still a tricky difference: for serde caused skipping for example, they
> >> will
> >> > be skipped at the very beginning and there's no effects taken at all.
> >> For
> >> > some others e.g. null-key / value at the reduce operator, it is only
> >> > skipped at the middle of the processing, i.e. some effects may have
> >> already
> >> > been taken in up-stream sub-topologies. And that's why for
> >> skipped-records
> >> > I've defined it on both task-level and node-level and the aggregate of
> >> the
> >> > latter may still be smaller than the former, whereas for
> >> dropped-records it
> >> > is only for node-level.
> >> >
> >> > So how about an even more significant change then: we enlarge the
> >> > `dropped-late-records` to `dropped-records` which is node-level only,
> >> but
> >> > includes reasons form lateness to semantics (like null-key) as well; and
> >> > then we have a task-level-only `skipped-records` which only record those
> >> > dropped at the very beginning and did not make it at all to the
> >> processing
> >> > topology. I feel this is a clearer distinguishment but also a bigger
> >> change
> >> > to users.
> >> >
> >>
> >> I like the way you dropped-records and skipped-records are now
> >> defined. My follow-up question is whether we should give names to
> >> those metrics that better describe their semantics, like:
> >>
> >> dropped-records-at-source and dropped-records-at-processor
> >>
> >> or
> >>
> >> records-dropped-at-source and records-dropped-at-processor
> >>
> >> or
> >>
> >> source-dropped-records and processor-dropped-records
> >>
> >> or alternatively with skipped. However, I would use the same term as
> >> in expired-window-record-drop
> >>
> >> Maybe, we should also consider to rename expired-window-record-drop to
> >> expired-window-record-dropped to be consistent.
> >>
> >> WDYT?
> >>
> >> I was not considering "expired-window-record-drop" before since it is a
> > store-level metric, and I was only considering task-level (skipped-records)
> > and processor-node-level (dropped-records) metrics, and I'm using different
> > terms deliberately to hint users that they are different leveled metrics.
> >
> > I still feel that using `skip` for task-level metrics indicating that this
> > record was not processed at all, and using `drop` for processor-level
> > metrics that this record is only dropped at this stage of the topology is a
> > better one; but I'm also okay with some finer grained metrics so that we
> > can align the processor-level with store-level (they are on the same
> > granularity any ways), like:
> >
> > `dropped-records-null-field`: at processor nodes
> >
> > `dropped-records-too-late`: at processor nodes
> >
> > `dropped-records-expired-window`: at window-stores
> >
> >
> >> >
> >> > > 5) What happens with removed metrics when the user sets the version of
> >> > > "built.in.metrics.version" to 2.2-
> >> > >
> >> > > I think for those redundant ones like ""forward-rate" and
> >> "destroy-rate"
> >> > we can still remove them with 2.2- as well; for other ones that are
> >> removed
> >> > / replaced like thread-level skipped-records we should still maintain
> >> them.
> >> >
> >>
> >> Could you add this comment about removal of redundant metrics to the
> >> KIP such that is documented somewhere?
> >>
> >> Yes, for sure.
> >
> >
> >>
> >> Best,
> >> Bruno
> >>
> >
> > I've also decided to remove the rebalance-related metrics from the
> > instance-level and move it to consumer itself as part of KIP-429.
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
> -- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Bruno,

Just realized that for `addRateSensor` and `addLatencyAndRateSensor` we've
actually added the total invocation metric already.


Guozhang

On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Bruno,
>
>
> On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io> wrote:
>
>> Hi Guozhang,
>>
>> I left my comments inline.
>>
>> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wa...@gmail.com> wrote:
>> >
>> > Hello Bruno,
>> >
>> > Thanks for the feedbacks, replied inline.
>> >
>> > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
>> wrote:
>> >
>> > > Hi Guozhang,
>> > >
>> > > Thank you for the KIP.
>> > >
>> > > 1) As far as I understand, the StreamsMetrics interface is there for
>> > > user-defined processors. Would it make sense to also add a method to
>> > > the interface to specify a sensor that records skipped records?
>> > >
>> > > Not sure I follow.. if users want to add a specific skipped records
>> > sensor, she can still do that as a "throughput" sensor via "
>> > addThroughputSensor" and then "record" right?
>> >
>> > As an after-thought, maybe it's better to rename `throughput` to `rate`
>> in
>> > the public APIs since it is really meant for the latter semantics. I did
>> > not change it just to make less API changes / deprecate fewer functions.
>> > But if we feel it is important we can change it as well.
>> >
>>
>> I see now that a user can record the rate of skipped records. However,
>> I was referring to the total number of skipped records. Maybe my
>> question should be more general: should we allow the user to also
>> specify sensors for totals or combinations of rate and totals?
>>
>> Sounds good to me, I will add it to the wiki page as well for
> StreamsMetrics.
>
>
>
>> Regarding the naming, I like `rate` more than `throughput`, but I
>> would not fight for it.
>>
>> >
>> > > 2) What are the semantics of active-task-process and
>> standby-task-process
>> > >
>> > > Ah good catch, I think I made it in the wrong column. Just some
>> > explanations here: Within a thread's looped iterations, it will first
>> try
>> > to process some records from the active tasks, and then see if there are
>> > any standby-tasks that can be processed as well (i.e. just reading from
>> the
>> > restore consumer and apply to the local stores). The ratio metrics are
>> for
>> > indicating 1) what tasks (active or standby) does this thread own so
>> far,
>> > and 2) how much time in percentage does it spend on each of them.
>> >
>> > But this metric should really be a task-level one that includes both the
>> > thread-id and task-id, and upon task migrations they will be dynamically
>> > deleted / (re)-created. For each task-id it may be owned by multiple
>> > threads as one active and others standby, and hence the separation of
>> > active / standby seems still necessary.
>> >
>>
>> Makes sense.
>>
>>
>> >
>> >
>> > > 3) How do dropped-late-records and expired-window-record-drop relate
>> > > to each other? I guess the former is for records that fall outside the
>> > > grace period and the latter is for records that are processed after
>> > > the retention period of the window. Is this correct?
>> > >
>> > > Yes, that's correct. The names are indeed a bit confusing since they
>> are
>> > added at different releases historically..
>> >
>> > More precisely, the `grace period` is a notion of the operator (hence
>> the
>> > metric is node-level, though it would only be used for DSL operators)
>> while
>> > the `retention` is a notion of the store (hence the metric is
>> store-level).
>> > Usually grace period will be smaller than store retention though.
>> >
>> > Processor node is aware of `grace period` and when received a record
>> that
>> > is older than grace deadline, it will be dropped immediately; otherwise
>> it
>> > will still be processed a maybe a new update is "put" into the store.
>> The
>> > store is aware of its `retention period` and then upon a "put" call if
>> it
>> > realized it is older than the retention deadline, that put call would be
>> > ignored and metric is recorded.
>> >
>> > We have to separate them here since the window store can be used in both
>> > DSL and PAPI, and for the former case it would likely to be already
>> ignored
>> > at the processor node level due to the grace period which is usually
>> > smaller than retention; but for PAPI there's no grace period and hence
>> the
>> > processor would likely still process and call "put" on the store.
>> >
>>
>> Alright! Got it!
>>
>> >
>> > > 4) Is there an actual difference between skipped and dropped records?
>> > > If not, shall we unify the terminology?
>> > >
>> > >
>> > There is. Dropped records are only due to lateness; where as skipped
>> > records can be due to serde errors (and user's error handling indicate
>> > "skip and continue"), timestamp errors, etc.
>> >
>> > I've considered maybe a better (more extensible) way would be defining a
>> > single metric name, say skipped-records, but use different tags to
>> indicate
>> > if its skipping reason (errors, windowing semantics, etc). But there's
>> > still a tricky difference: for serde caused skipping for example, they
>> will
>> > be skipped at the very beginning and there's no effects taken at all.
>> For
>> > some others e.g. null-key / value at the reduce operator, it is only
>> > skipped at the middle of the processing, i.e. some effects may have
>> already
>> > been taken in up-stream sub-topologies. And that's why for
>> skipped-records
>> > I've defined it on both task-level and node-level and the aggregate of
>> the
>> > latter may still be smaller than the former, whereas for
>> dropped-records it
>> > is only for node-level.
>> >
>> > So how about an even more significant change then: we enlarge the
>> > `dropped-late-records` to `dropped-records` which is node-level only,
>> but
>> > includes reasons form lateness to semantics (like null-key) as well; and
>> > then we have a task-level-only `skipped-records` which only record those
>> > dropped at the very beginning and did not make it at all to the
>> processing
>> > topology. I feel this is a clearer distinguishment but also a bigger
>> change
>> > to users.
>> >
>>
>> I like the way you dropped-records and skipped-records are now
>> defined. My follow-up question is whether we should give names to
>> those metrics that better describe their semantics, like:
>>
>> dropped-records-at-source and dropped-records-at-processor
>>
>> or
>>
>> records-dropped-at-source and records-dropped-at-processor
>>
>> or
>>
>> source-dropped-records and processor-dropped-records
>>
>> or alternatively with skipped. However, I would use the same term as
>> in expired-window-record-drop
>>
>> Maybe, we should also consider to rename expired-window-record-drop to
>> expired-window-record-dropped to be consistent.
>>
>> WDYT?
>>
>> I was not considering "expired-window-record-drop" before since it is a
> store-level metric, and I was only considering task-level (skipped-records)
> and processor-node-level (dropped-records) metrics, and I'm using different
> terms deliberately to hint users that they are different leveled metrics.
>
> I still feel that using `skip` for task-level metrics indicating that this
> record was not processed at all, and using `drop` for processor-level
> metrics that this record is only dropped at this stage of the topology is a
> better one; but I'm also okay with some finer grained metrics so that we
> can align the processor-level with store-level (they are on the same
> granularity any ways), like:
>
> `dropped-records-null-field`: at processor nodes
>
> `dropped-records-too-late`: at processor nodes
>
> `dropped-records-expired-window`: at window-stores
>
>
>> >
>> > > 5) What happens with removed metrics when the user sets the version of
>> > > "built.in.metrics.version" to 2.2-
>> > >
>> > > I think for those redundant ones like ""forward-rate" and
>> "destroy-rate"
>> > we can still remove them with 2.2- as well; for other ones that are
>> removed
>> > / replaced like thread-level skipped-records we should still maintain
>> them.
>> >
>>
>> Could you add this comment about removal of redundant metrics to the
>> KIP such that is documented somewhere?
>>
>> Yes, for sure.
>
>
>>
>> Best,
>> Bruno
>>
>
> I've also decided to remove the rebalance-related metrics from the
> instance-level and move it to consumer itself as part of KIP-429.
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Bruno,


On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Guozhang,
>
> I left my comments inline.
>
> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > Hello Bruno,
> >
> > Thanks for the feedbacks, replied inline.
> >
> > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io> wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thank you for the KIP.
> > >
> > > 1) As far as I understand, the StreamsMetrics interface is there for
> > > user-defined processors. Would it make sense to also add a method to
> > > the interface to specify a sensor that records skipped records?
> > >
> > > Not sure I follow.. if users want to add a specific skipped records
> > sensor, she can still do that as a "throughput" sensor via "
> > addThroughputSensor" and then "record" right?
> >
> > As an after-thought, maybe it's better to rename `throughput` to `rate`
> in
> > the public APIs since it is really meant for the latter semantics. I did
> > not change it just to make less API changes / deprecate fewer functions.
> > But if we feel it is important we can change it as well.
> >
>
> I see now that a user can record the rate of skipped records. However,
> I was referring to the total number of skipped records. Maybe my
> question should be more general: should we allow the user to also
> specify sensors for totals or combinations of rate and totals?
>
> Sounds good to me, I will add it to the wiki page as well for
StreamsMetrics.



> Regarding the naming, I like `rate` more than `throughput`, but I
> would not fight for it.
>
> >
> > > 2) What are the semantics of active-task-process and
> standby-task-process
> > >
> > > Ah good catch, I think I made it in the wrong column. Just some
> > explanations here: Within a thread's looped iterations, it will first try
> > to process some records from the active tasks, and then see if there are
> > any standby-tasks that can be processed as well (i.e. just reading from
> the
> > restore consumer and apply to the local stores). The ratio metrics are
> for
> > indicating 1) what tasks (active or standby) does this thread own so far,
> > and 2) how much time in percentage does it spend on each of them.
> >
> > But this metric should really be a task-level one that includes both the
> > thread-id and task-id, and upon task migrations they will be dynamically
> > deleted / (re)-created. For each task-id it may be owned by multiple
> > threads as one active and others standby, and hence the separation of
> > active / standby seems still necessary.
> >
>
> Makes sense.
>
>
> >
> >
> > > 3) How do dropped-late-records and expired-window-record-drop relate
> > > to each other? I guess the former is for records that fall outside the
> > > grace period and the latter is for records that are processed after
> > > the retention period of the window. Is this correct?
> > >
> > > Yes, that's correct. The names are indeed a bit confusing since they
> are
> > added at different releases historically..
> >
> > More precisely, the `grace period` is a notion of the operator (hence the
> > metric is node-level, though it would only be used for DSL operators)
> while
> > the `retention` is a notion of the store (hence the metric is
> store-level).
> > Usually grace period will be smaller than store retention though.
> >
> > Processor node is aware of `grace period` and when received a record that
> > is older than grace deadline, it will be dropped immediately; otherwise
> it
> > will still be processed a maybe a new update is "put" into the store. The
> > store is aware of its `retention period` and then upon a "put" call if it
> > realized it is older than the retention deadline, that put call would be
> > ignored and metric is recorded.
> >
> > We have to separate them here since the window store can be used in both
> > DSL and PAPI, and for the former case it would likely to be already
> ignored
> > at the processor node level due to the grace period which is usually
> > smaller than retention; but for PAPI there's no grace period and hence
> the
> > processor would likely still process and call "put" on the store.
> >
>
> Alright! Got it!
>
> >
> > > 4) Is there an actual difference between skipped and dropped records?
> > > If not, shall we unify the terminology?
> > >
> > >
> > There is. Dropped records are only due to lateness; where as skipped
> > records can be due to serde errors (and user's error handling indicate
> > "skip and continue"), timestamp errors, etc.
> >
> > I've considered maybe a better (more extensible) way would be defining a
> > single metric name, say skipped-records, but use different tags to
> indicate
> > if its skipping reason (errors, windowing semantics, etc). But there's
> > still a tricky difference: for serde caused skipping for example, they
> will
> > be skipped at the very beginning and there's no effects taken at all. For
> > some others e.g. null-key / value at the reduce operator, it is only
> > skipped at the middle of the processing, i.e. some effects may have
> already
> > been taken in up-stream sub-topologies. And that's why for
> skipped-records
> > I've defined it on both task-level and node-level and the aggregate of
> the
> > latter may still be smaller than the former, whereas for dropped-records
> it
> > is only for node-level.
> >
> > So how about an even more significant change then: we enlarge the
> > `dropped-late-records` to `dropped-records` which is node-level only, but
> > includes reasons form lateness to semantics (like null-key) as well; and
> > then we have a task-level-only `skipped-records` which only record those
> > dropped at the very beginning and did not make it at all to the
> processing
> > topology. I feel this is a clearer distinguishment but also a bigger
> change
> > to users.
> >
>
> I like the way you dropped-records and skipped-records are now
> defined. My follow-up question is whether we should give names to
> those metrics that better describe their semantics, like:
>
> dropped-records-at-source and dropped-records-at-processor
>
> or
>
> records-dropped-at-source and records-dropped-at-processor
>
> or
>
> source-dropped-records and processor-dropped-records
>
> or alternatively with skipped. However, I would use the same term as
> in expired-window-record-drop
>
> Maybe, we should also consider to rename expired-window-record-drop to
> expired-window-record-dropped to be consistent.
>
> WDYT?
>
> I was not considering "expired-window-record-drop" before since it is a
store-level metric, and I was only considering task-level (skipped-records)
and processor-node-level (dropped-records) metrics, and I'm using different
terms deliberately to hint users that they are different leveled metrics.

I still feel that using `skip` for task-level metrics indicating that this
record was not processed at all, and using `drop` for processor-level
metrics that this record is only dropped at this stage of the topology is a
better one; but I'm also okay with some finer grained metrics so that we
can align the processor-level with store-level (they are on the same
granularity any ways), like:

`dropped-records-null-field`: at processor nodes

`dropped-records-too-late`: at processor nodes

`dropped-records-expired-window`: at window-stores


> >
> > > 5) What happens with removed metrics when the user sets the version of
> > > "built.in.metrics.version" to 2.2-
> > >
> > > I think for those redundant ones like ""forward-rate" and
> "destroy-rate"
> > we can still remove them with 2.2- as well; for other ones that are
> removed
> > / replaced like thread-level skipped-records we should still maintain
> them.
> >
>
> Could you add this comment about removal of redundant metrics to the
> KIP such that is documented somewhere?
>
> Yes, for sure.


>
> Best,
> Bruno
>

I've also decided to remove the rebalance-related metrics from the
instance-level and move it to consumer itself as part of KIP-429.


-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Guozhang,

I left my comments inline.

On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> Hello Bruno,
>
> Thanks for the feedbacks, replied inline.
>
> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io> wrote:
>
> > Hi Guozhang,
> >
> > Thank you for the KIP.
> >
> > 1) As far as I understand, the StreamsMetrics interface is there for
> > user-defined processors. Would it make sense to also add a method to
> > the interface to specify a sensor that records skipped records?
> >
> > Not sure I follow.. if users want to add a specific skipped records
> sensor, she can still do that as a "throughput" sensor via "
> addThroughputSensor" and then "record" right?
>
> As an after-thought, maybe it's better to rename `throughput` to `rate` in
> the public APIs since it is really meant for the latter semantics. I did
> not change it just to make less API changes / deprecate fewer functions.
> But if we feel it is important we can change it as well.
>

I see now that a user can record the rate of skipped records. However,
I was referring to the total number of skipped records. Maybe my
question should be more general: should we allow the user to also
specify sensors for totals or combinations of rate and totals?

Regarding the naming, I like `rate` more than `throughput`, but I
would not fight for it.

>
> > 2) What are the semantics of active-task-process and standby-task-process
> >
> > Ah good catch, I think I made it in the wrong column. Just some
> explanations here: Within a thread's looped iterations, it will first try
> to process some records from the active tasks, and then see if there are
> any standby-tasks that can be processed as well (i.e. just reading from the
> restore consumer and apply to the local stores). The ratio metrics are for
> indicating 1) what tasks (active or standby) does this thread own so far,
> and 2) how much time in percentage does it spend on each of them.
>
> But this metric should really be a task-level one that includes both the
> thread-id and task-id, and upon task migrations they will be dynamically
> deleted / (re)-created. For each task-id it may be owned by multiple
> threads as one active and others standby, and hence the separation of
> active / standby seems still necessary.
>

Makes sense.


>
>
> > 3) How do dropped-late-records and expired-window-record-drop relate
> > to each other? I guess the former is for records that fall outside the
> > grace period and the latter is for records that are processed after
> > the retention period of the window. Is this correct?
> >
> > Yes, that's correct. The names are indeed a bit confusing since they are
> added at different releases historically..
>
> More precisely, the `grace period` is a notion of the operator (hence the
> metric is node-level, though it would only be used for DSL operators) while
> the `retention` is a notion of the store (hence the metric is store-level).
> Usually grace period will be smaller than store retention though.
>
> Processor node is aware of `grace period` and when received a record that
> is older than grace deadline, it will be dropped immediately; otherwise it
> will still be processed a maybe a new update is "put" into the store. The
> store is aware of its `retention period` and then upon a "put" call if it
> realized it is older than the retention deadline, that put call would be
> ignored and metric is recorded.
>
> We have to separate them here since the window store can be used in both
> DSL and PAPI, and for the former case it would likely to be already ignored
> at the processor node level due to the grace period which is usually
> smaller than retention; but for PAPI there's no grace period and hence the
> processor would likely still process and call "put" on the store.
>

Alright! Got it!

>
> > 4) Is there an actual difference between skipped and dropped records?
> > If not, shall we unify the terminology?
> >
> >
> There is. Dropped records are only due to lateness; where as skipped
> records can be due to serde errors (and user's error handling indicate
> "skip and continue"), timestamp errors, etc.
>
> I've considered maybe a better (more extensible) way would be defining a
> single metric name, say skipped-records, but use different tags to indicate
> if its skipping reason (errors, windowing semantics, etc). But there's
> still a tricky difference: for serde caused skipping for example, they will
> be skipped at the very beginning and there's no effects taken at all. For
> some others e.g. null-key / value at the reduce operator, it is only
> skipped at the middle of the processing, i.e. some effects may have already
> been taken in up-stream sub-topologies. And that's why for skipped-records
> I've defined it on both task-level and node-level and the aggregate of the
> latter may still be smaller than the former, whereas for dropped-records it
> is only for node-level.
>
> So how about an even more significant change then: we enlarge the
> `dropped-late-records` to `dropped-records` which is node-level only, but
> includes reasons form lateness to semantics (like null-key) as well; and
> then we have a task-level-only `skipped-records` which only record those
> dropped at the very beginning and did not make it at all to the processing
> topology. I feel this is a clearer distinguishment but also a bigger change
> to users.
>

I like the way you dropped-records and skipped-records are now
defined. My follow-up question is whether we should give names to
those metrics that better describe their semantics, like:

dropped-records-at-source and dropped-records-at-processor

or

records-dropped-at-source and records-dropped-at-processor

or

source-dropped-records and processor-dropped-records

or alternatively with skipped. However, I would use the same term as
in expired-window-record-drop

Maybe, we should also consider to rename expired-window-record-drop to
expired-window-record-dropped to be consistent.

WDYT?

>
> > 5) What happens with removed metrics when the user sets the version of
> > "built.in.metrics.version" to 2.2-
> >
> > I think for those redundant ones like ""forward-rate" and "destroy-rate"
> we can still remove them with 2.2- as well; for other ones that are removed
> / replaced like thread-level skipped-records we should still maintain them.
>

Could you add this comment about removal of redundant metrics to the
KIP such that is documented somewhere?


Best,
Bruno

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Bruno,

Thanks for the feedbacks, replied inline.

On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Guozhang,
>
> Thank you for the KIP.
>
> 1) As far as I understand, the StreamsMetrics interface is there for
> user-defined processors. Would it make sense to also add a method to
> the interface to specify a sensor that records skipped records?
>
> Not sure I follow.. if users want to add a specific skipped records
sensor, she can still do that as a "throughput" sensor via "
addThroughputSensor" and then "record" right?

As an after-thought, maybe it's better to rename `throughput` to `rate` in
the public APIs since it is really meant for the latter semantics. I did
not change it just to make less API changes / deprecate fewer functions.
But if we feel it is important we can change it as well.


> 2) What are the semantics of active-task-process and standby-task-process
>
> Ah good catch, I think I made it in the wrong column. Just some
explanations here: Within a thread's looped iterations, it will first try
to process some records from the active tasks, and then see if there are
any standby-tasks that can be processed as well (i.e. just reading from the
restore consumer and apply to the local stores). The ratio metrics are for
indicating 1) what tasks (active or standby) does this thread own so far,
and 2) how much time in percentage does it spend on each of them.

But this metric should really be a task-level one that includes both the
thread-id and task-id, and upon task migrations they will be dynamically
deleted / (re)-created. For each task-id it may be owned by multiple
threads as one active and others standby, and hence the separation of
active / standby seems still necessary.



> 3) How do dropped-late-records and expired-window-record-drop relate
> to each other? I guess the former is for records that fall outside the
> grace period and the latter is for records that are processed after
> the retention period of the window. Is this correct?
>
> Yes, that's correct. The names are indeed a bit confusing since they are
added at different releases historically..

More precisely, the `grace period` is a notion of the operator (hence the
metric is node-level, though it would only be used for DSL operators) while
the `retention` is a notion of the store (hence the metric is store-level).
Usually grace period will be smaller than store retention though.

Processor node is aware of `grace period` and when received a record that
is older than grace deadline, it will be dropped immediately; otherwise it
will still be processed a maybe a new update is "put" into the store. The
store is aware of its `retention period` and then upon a "put" call if it
realized it is older than the retention deadline, that put call would be
ignored and metric is recorded.

We have to separate them here since the window store can be used in both
DSL and PAPI, and for the former case it would likely to be already ignored
at the processor node level due to the grace period which is usually
smaller than retention; but for PAPI there's no grace period and hence the
processor would likely still process and call "put" on the store.


> 4) Is there an actual difference between skipped and dropped records?
> If not, shall we unify the terminology?
>
>
There is. Dropped records are only due to lateness; where as skipped
records can be due to serde errors (and user's error handling indicate
"skip and continue"), timestamp errors, etc.

I've considered maybe a better (more extensible) way would be defining a
single metric name, say skipped-records, but use different tags to indicate
if its skipping reason (errors, windowing semantics, etc). But there's
still a tricky difference: for serde caused skipping for example, they will
be skipped at the very beginning and there's no effects taken at all. For
some others e.g. null-key / value at the reduce operator, it is only
skipped at the middle of the processing, i.e. some effects may have already
been taken in up-stream sub-topologies. And that's why for skipped-records
I've defined it on both task-level and node-level and the aggregate of the
latter may still be smaller than the former, whereas for dropped-records it
is only for node-level.

So how about an even more significant change then: we enlarge the
`dropped-late-records` to `dropped-records` which is node-level only, but
includes reasons form lateness to semantics (like null-key) as well; and
then we have a task-level-only `skipped-records` which only record those
dropped at the very beginning and did not make it at all to the processing
topology. I feel this is a clearer distinguishment but also a bigger change
to users.


> 5) What happens with removed metrics when the user sets the version of
> "built.in.metrics.version" to 2.2-
>
> I think for those redundant ones like ""forward-rate" and "destroy-rate"
we can still remove them with 2.2- as well; for other ones that are removed
/ replaced like thread-level skipped-records we should still maintain them.


Best,
> Bruno
>
> On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > Hello folks,
> >
> > As 2.3 is released now, I'd like to bump up this KIP discussion again for
> > your reviews.
> >
> >
> > Guozhang
> >
> >
> > On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Patrik,
> > >
> > > Since we are rolling out 2.3 and everyone is busy with the release now
> > > this KIP does not have much discussion involved yet and will slip into
> the
> > > next release cadence.
> > >
> > > This KIP itself contains several parts itself: 1. refactoring the
> existing
> > > metrics hierarchy to cleanup some redundancy and also get more
> clarity; 2.
> > > add instance-level metrics like rebalance and state metrics, as well as
> > > other static metrics.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <pk...@gmail.com>
> wrote:
> > >
> > >> Hi Guozhang
> > >> Thanks for the KIP, this looks very helpful.
> > >> Could you please provide more detail on the metrics planned for the
> state?
> > >> We were just considering how to implement this ourselves because we
> need
> > >> to
> > >> track the history of stage changes.
> > >> The idea was to have an accumulated "seconds in state x" metric for
> every
> > >> state.
> > >> The new rebalance metric might solve part of our use case, but it is
> > >> interesting what you have planned for the state metric.
> > >> best regards
> > >> Patrik
> > >>
> > >> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <wa...@gmail.com>
> wrote:
> > >>
> > >> > Hello folks,
> > >> >
> > >> > I'd like to propose the following KIP to improve the Kafka Streams
> > >> metrics
> > >> > mechanism to users. This includes 1) a minor change in the public
> > >> > StreamsMetrics API, and 2) a major cleanup on the Streams' own
> built-in
> > >> > metrics hierarchy.
> > >> >
> > >> > Details can be found here:
> > >> >
> > >> >
> > >> >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> > >> >
> > >> > I'd love to hear your thoughts and feedbacks. Thanks!
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >> >
> > >>
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Good catch! Will update the wiki.

On Wed, Jul 3, 2019 at 6:19 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Guozhang,
>
> I just noticed that the Per-State-Store tags are somehow mixed up in
> the KIP (e.g "client-id=[threadId]").
>
> Best,
> Bruno
>
> On Mon, Jul 1, 2019 at 6:28 PM Boyang Chen <re...@gmail.com>
> wrote:
> >
> > Hey Guozhang,
> >
> > do we plan to add per partition latency in this KIP?
> >
> > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io> wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thank you for the KIP.
> > >
> > > 1) As far as I understand, the StreamsMetrics interface is there for
> > > user-defined processors. Would it make sense to also add a method to
> > > the interface to specify a sensor that records skipped records?
> > >
> > > 2) What are the semantics of active-task-process and
> standby-task-process
> > >
> > > 3) How do dropped-late-records and expired-window-record-drop relate
> > > to each other? I guess the former is for records that fall outside the
> > > grace period and the latter is for records that are processed after
> > > the retention period of the window. Is this correct?
> > >
> > > 4) Is there an actual difference between skipped and dropped records?
> > > If not, shall we unify the terminology?
> > >
> > > 5) What happens with removed metrics when the user sets the version of
> > > "built.in.metrics.version" to 2.2-
> > >
> > > Best,
> > > Bruno
> > >
> > > On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> > > >
> > > > Hello folks,
> > > >
> > > > As 2.3 is released now, I'd like to bump up this KIP discussion
> again for
> > > > your reviews.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Patrik,
> > > > >
> > > > > Since we are rolling out 2.3 and everyone is busy with the release
> now
> > > > > this KIP does not have much discussion involved yet and will slip
> into
> > > the
> > > > > next release cadence.
> > > > >
> > > > > This KIP itself contains several parts itself: 1. refactoring the
> > > existing
> > > > > metrics hierarchy to cleanup some redundancy and also get more
> > > clarity; 2.
> > > > > add instance-level metrics like rebalance and state metrics, as
> well as
> > > > > other static metrics.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <pkleindl@gmail.com
> >
> > > wrote:
> > > > >
> > > > >> Hi Guozhang
> > > > >> Thanks for the KIP, this looks very helpful.
> > > > >> Could you please provide more detail on the metrics planned for
> the
> > > state?
> > > > >> We were just considering how to implement this ourselves because
> we
> > > need
> > > > >> to
> > > > >> track the history of stage changes.
> > > > >> The idea was to have an accumulated "seconds in state x" metric
> for
> > > every
> > > > >> state.
> > > > >> The new rebalance metric might solve part of our use case, but it
> is
> > > > >> interesting what you have planned for the state metric.
> > > > >> best regards
> > > > >> Patrik
> > > > >>
> > > > >> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > > >>
> > > > >> > Hello folks,
> > > > >> >
> > > > >> > I'd like to propose the following KIP to improve the Kafka
> Streams
> > > > >> metrics
> > > > >> > mechanism to users. This includes 1) a minor change in the
> public
> > > > >> > StreamsMetrics API, and 2) a major cleanup on the Streams' own
> > > built-in
> > > > >> > metrics hierarchy.
> > > > >> >
> > > > >> > Details can be found here:
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> > > > >> >
> > > > >> > I'd love to hear your thoughts and feedbacks. Thanks!
> > > > >> >
> > > > >> > --
> > > > >> > -- Guozhang
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Guozhang,

I just noticed that the Per-State-Store tags are somehow mixed up in
the KIP (e.g "client-id=[threadId]").

Best,
Bruno

On Mon, Jul 1, 2019 at 6:28 PM Boyang Chen <re...@gmail.com> wrote:
>
> Hey Guozhang,
>
> do we plan to add per partition latency in this KIP?
>
> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io> wrote:
>
> > Hi Guozhang,
> >
> > Thank you for the KIP.
> >
> > 1) As far as I understand, the StreamsMetrics interface is there for
> > user-defined processors. Would it make sense to also add a method to
> > the interface to specify a sensor that records skipped records?
> >
> > 2) What are the semantics of active-task-process and standby-task-process
> >
> > 3) How do dropped-late-records and expired-window-record-drop relate
> > to each other? I guess the former is for records that fall outside the
> > grace period and the latter is for records that are processed after
> > the retention period of the window. Is this correct?
> >
> > 4) Is there an actual difference between skipped and dropped records?
> > If not, shall we unify the terminology?
> >
> > 5) What happens with removed metrics when the user sets the version of
> > "built.in.metrics.version" to 2.2-
> >
> > Best,
> > Bruno
> >
> > On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com> wrote:
> > >
> > > Hello folks,
> > >
> > > As 2.3 is released now, I'd like to bump up this KIP discussion again for
> > > your reviews.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hello Patrik,
> > > >
> > > > Since we are rolling out 2.3 and everyone is busy with the release now
> > > > this KIP does not have much discussion involved yet and will slip into
> > the
> > > > next release cadence.
> > > >
> > > > This KIP itself contains several parts itself: 1. refactoring the
> > existing
> > > > metrics hierarchy to cleanup some redundancy and also get more
> > clarity; 2.
> > > > add instance-level metrics like rebalance and state metrics, as well as
> > > > other static metrics.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <pk...@gmail.com>
> > wrote:
> > > >
> > > >> Hi Guozhang
> > > >> Thanks for the KIP, this looks very helpful.
> > > >> Could you please provide more detail on the metrics planned for the
> > state?
> > > >> We were just considering how to implement this ourselves because we
> > need
> > > >> to
> > > >> track the history of stage changes.
> > > >> The idea was to have an accumulated "seconds in state x" metric for
> > every
> > > >> state.
> > > >> The new rebalance metric might solve part of our use case, but it is
> > > >> interesting what you have planned for the state metric.
> > > >> best regards
> > > >> Patrik
> > > >>
> > > >> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > > >>
> > > >> > Hello folks,
> > > >> >
> > > >> > I'd like to propose the following KIP to improve the Kafka Streams
> > > >> metrics
> > > >> > mechanism to users. This includes 1) a minor change in the public
> > > >> > StreamsMetrics API, and 2) a major cleanup on the Streams' own
> > built-in
> > > >> > metrics hierarchy.
> > > >> >
> > > >> > Details can be found here:
> > > >> >
> > > >> >
> > > >> >
> > > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> > > >> >
> > > >> > I'd love to hear your thoughts and feedbacks. Thanks!
> > > >> >
> > > >> > --
> > > >> > -- Guozhang
> > > >> >
> > > >>
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> >

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
For the existing releases, yes (with KIP-447 we are already going to do
that anyways), for future release maybe not --- hopefully we only do such
metrics refactoring once.


Guozhang

On Fri, Aug 2, 2019 at 3:23 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Would this imply that we need to update the config in each release to
> add a new accepted value?
>
>
> -Matthias
>
> On 8/2/19 1:07 PM, Guozhang Wang wrote:
> > Hello Matthias,
> >
> > That's a good question. Thinking about a bit more, I'd like to propose
> that
> > we just list all the version numbers since 0.10 to 2.4 as accepted
> values,
> > and let Streams to decide if old / new set of metrics can be used
> > internally (implementation wise we can reuse the const values for
> > `upgrade.from` as well).
> >
> > And then in the future when we remove the metrics, we can just remove the
> > corresponding version values from the accepted list of this config.
> >
> >
> > Guozhang
> >
> > On Tue, Jul 23, 2019 at 11:55 AM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Thanks for the KIP Guozhang.
> >>
> >> I just re-read the wiki page and the DISCUSS thread. Overall LGTM.
> >>
> >> The only nit is the naming of the new config values. With AK 2.3 being
> >> released the versions numbers needs to be updated.
> >>
> >> Additionally, I actually think that "2.2-" and "2.3" are not the best
> >> names: the `-` suffix is very subtle IMHO and actually looks more like a
> >> typo, and it might be better to be more elaborate. Maybe something like
> >> "up-to-2.2" ?
> >>
> >> For "2.3", this config value would be weird for future releases (ie,
> >> 2.4, 2.5, 2.6). Hence, we might want to rename it to "newest" /
> >> "current" or something like this?
> >>
> >> Another alternative may be to rename it to "since-2.3" (or similar) --
> >> however, this may require to rename the config if we change metrics in a
> >> future release (hence, it's not my preferred option).
> >>
> >> Thoughts?
> >>
> >>
> >> -Matthias
> >>
> >> On 7/22/19 6:33 PM, Guozhang Wang wrote:
> >>> Thanks everyone for your inputs, I've updated the wiki page
> accordingly.
> >>>
> >>> @Bruno: please let me know if you have any further thoughts per my
> >> replies
> >>> above.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Mon, Jul 22, 2019 at 6:30 PM Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >>>
> >>>> Thanks Boyang,
> >>>>
> >>>> I've thought about exposing time via metrics in Streams. The tricky
> part
> >>>> though is which layer of time we should expose: right now we have
> >>>> task-level and partition-level stream time (what you suggested), and
> >> also
> >>>> some processor internally maintain their own observed time. Today we
> are
> >>>> still trying to get a clear and simple way of exposing a single time
> >>>> concept for users to reason about their application's progress. So
> >> before
> >>>> we come up with a good solution I'd postpone adding it in a future
> KIP.
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Thu, Jul 18, 2019 at 1:21 PM Boyang Chen <
> reluctanthero104@gmail.com
> >>>
> >>>> wrote:
> >>>>
> >>>>> I mean the partition time.
> >>>>>
> >>>>> On Thu, Jul 18, 2019 at 11:29 AM Guozhang Wang <wa...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Boyang,
> >>>>>>
> >>>>>> What do you mean by `per partition latency`?
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Mon, Jul 1, 2019 at 9:28 AM Boyang Chen <
> >> reluctanthero104@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hey Guozhang,
> >>>>>>>
> >>>>>>> do we plan to add per partition latency in this KIP?
> >>>>>>>
> >>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Guozhang,
> >>>>>>>>
> >>>>>>>> Thank you for the KIP.
> >>>>>>>>
> >>>>>>>> 1) As far as I understand, the StreamsMetrics interface is there
> for
> >>>>>>>> user-defined processors. Would it make sense to also add a method
> to
> >>>>>>>> the interface to specify a sensor that records skipped records?
> >>>>>>>>
> >>>>>>>> 2) What are the semantics of active-task-process and
> >>>>>> standby-task-process
> >>>>>>>>
> >>>>>>>> 3) How do dropped-late-records and expired-window-record-drop
> relate
> >>>>>>>> to each other? I guess the former is for records that fall outside
> >>>>> the
> >>>>>>>> grace period and the latter is for records that are processed
> after
> >>>>>>>> the retention period of the window. Is this correct?
> >>>>>>>>
> >>>>>>>> 4) Is there an actual difference between skipped and dropped
> >>>>> records?
> >>>>>>>> If not, shall we unify the terminology?
> >>>>>>>>
> >>>>>>>> 5) What happens with removed metrics when the user sets the
> version
> >>>>> of
> >>>>>>>> "built.in.metrics.version" to 2.2-
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Bruno
> >>>>>>>>
> >>>>>>>> On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wangguoz@gmail.com
> >
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hello folks,
> >>>>>>>>>
> >>>>>>>>> As 2.3 is released now, I'd like to bump up this KIP discussion
> >>>>> again
> >>>>>>> for
> >>>>>>>>> your reviews.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <
> wangguoz@gmail.com
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hello Patrik,
> >>>>>>>>>>
> >>>>>>>>>> Since we are rolling out 2.3 and everyone is busy with the
> >>>>> release
> >>>>>>> now
> >>>>>>>>>> this KIP does not have much discussion involved yet and will
> >>>>> slip
> >>>>>>> into
> >>>>>>>> the
> >>>>>>>>>> next release cadence.
> >>>>>>>>>>
> >>>>>>>>>> This KIP itself contains several parts itself: 1. refactoring
> >>>>> the
> >>>>>>>> existing
> >>>>>>>>>> metrics hierarchy to cleanup some redundancy and also get more
> >>>>>>>> clarity; 2.
> >>>>>>>>>> add instance-level metrics like rebalance and state metrics, as
> >>>>>> well
> >>>>>>> as
> >>>>>>>>>> other static metrics.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <
> >>>>> pkleindl@gmail.com
> >>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Guozhang
> >>>>>>>>>>> Thanks for the KIP, this looks very helpful.
> >>>>>>>>>>> Could you please provide more detail on the metrics planned for
> >>>>>> the
> >>>>>>>> state?
> >>>>>>>>>>> We were just considering how to implement this ourselves
> >>>>> because
> >>>>>> we
> >>>>>>>> need
> >>>>>>>>>>> to
> >>>>>>>>>>> track the history of stage changes.
> >>>>>>>>>>> The idea was to have an accumulated "seconds in state x" metric
> >>>>>> for
> >>>>>>>> every
> >>>>>>>>>>> state.
> >>>>>>>>>>> The new rebalance metric might solve part of our use case, but
> >>>>> it
> >>>>>> is
> >>>>>>>>>>> interesting what you have planned for the state metric.
> >>>>>>>>>>> best regards
> >>>>>>>>>>> Patrik
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <
> >>>>> wangguoz@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hello folks,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'd like to propose the following KIP to improve the Kafka
> >>>>>> Streams
> >>>>>>>>>>> metrics
> >>>>>>>>>>>> mechanism to users. This includes 1) a minor change in the
> >>>>>> public
> >>>>>>>>>>>> StreamsMetrics API, and 2) a major cleanup on the Streams'
> >>>>> own
> >>>>>>>> built-in
> >>>>>>>>>>>> metrics hierarchy.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Details can be found here:
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'd love to hear your thoughts and feedbacks. Thanks!
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>>
> >>
> >>
> >
>
>

-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Would this imply that we need to update the config in each release to
add a new accepted value?


-Matthias

On 8/2/19 1:07 PM, Guozhang Wang wrote:
> Hello Matthias,
> 
> That's a good question. Thinking about a bit more, I'd like to propose that
> we just list all the version numbers since 0.10 to 2.4 as accepted values,
> and let Streams to decide if old / new set of metrics can be used
> internally (implementation wise we can reuse the const values for
> `upgrade.from` as well).
> 
> And then in the future when we remove the metrics, we can just remove the
> corresponding version values from the accepted list of this config.
> 
> 
> Guozhang
> 
> On Tue, Jul 23, 2019 at 11:55 AM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Thanks for the KIP Guozhang.
>>
>> I just re-read the wiki page and the DISCUSS thread. Overall LGTM.
>>
>> The only nit is the naming of the new config values. With AK 2.3 being
>> released the versions numbers needs to be updated.
>>
>> Additionally, I actually think that "2.2-" and "2.3" are not the best
>> names: the `-` suffix is very subtle IMHO and actually looks more like a
>> typo, and it might be better to be more elaborate. Maybe something like
>> "up-to-2.2" ?
>>
>> For "2.3", this config value would be weird for future releases (ie,
>> 2.4, 2.5, 2.6). Hence, we might want to rename it to "newest" /
>> "current" or something like this?
>>
>> Another alternative may be to rename it to "since-2.3" (or similar) --
>> however, this may require to rename the config if we change metrics in a
>> future release (hence, it's not my preferred option).
>>
>> Thoughts?
>>
>>
>> -Matthias
>>
>> On 7/22/19 6:33 PM, Guozhang Wang wrote:
>>> Thanks everyone for your inputs, I've updated the wiki page accordingly.
>>>
>>> @Bruno: please let me know if you have any further thoughts per my
>> replies
>>> above.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Mon, Jul 22, 2019 at 6:30 PM Guozhang Wang <wa...@gmail.com>
>> wrote:
>>>
>>>> Thanks Boyang,
>>>>
>>>> I've thought about exposing time via metrics in Streams. The tricky part
>>>> though is which layer of time we should expose: right now we have
>>>> task-level and partition-level stream time (what you suggested), and
>> also
>>>> some processor internally maintain their own observed time. Today we are
>>>> still trying to get a clear and simple way of exposing a single time
>>>> concept for users to reason about their application's progress. So
>> before
>>>> we come up with a good solution I'd postpone adding it in a future KIP.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Thu, Jul 18, 2019 at 1:21 PM Boyang Chen <reluctanthero104@gmail.com
>>>
>>>> wrote:
>>>>
>>>>> I mean the partition time.
>>>>>
>>>>> On Thu, Jul 18, 2019 at 11:29 AM Guozhang Wang <wa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Boyang,
>>>>>>
>>>>>> What do you mean by `per partition latency`?
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>> On Mon, Jul 1, 2019 at 9:28 AM Boyang Chen <
>> reluctanthero104@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Guozhang,
>>>>>>>
>>>>>>> do we plan to add per partition latency in this KIP?
>>>>>>>
>>>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Guozhang,
>>>>>>>>
>>>>>>>> Thank you for the KIP.
>>>>>>>>
>>>>>>>> 1) As far as I understand, the StreamsMetrics interface is there for
>>>>>>>> user-defined processors. Would it make sense to also add a method to
>>>>>>>> the interface to specify a sensor that records skipped records?
>>>>>>>>
>>>>>>>> 2) What are the semantics of active-task-process and
>>>>>> standby-task-process
>>>>>>>>
>>>>>>>> 3) How do dropped-late-records and expired-window-record-drop relate
>>>>>>>> to each other? I guess the former is for records that fall outside
>>>>> the
>>>>>>>> grace period and the latter is for records that are processed after
>>>>>>>> the retention period of the window. Is this correct?
>>>>>>>>
>>>>>>>> 4) Is there an actual difference between skipped and dropped
>>>>> records?
>>>>>>>> If not, shall we unify the terminology?
>>>>>>>>
>>>>>>>> 5) What happens with removed metrics when the user sets the version
>>>>> of
>>>>>>>> "built.in.metrics.version" to 2.2-
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Bruno
>>>>>>>>
>>>>>>>> On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hello folks,
>>>>>>>>>
>>>>>>>>> As 2.3 is released now, I'd like to bump up this KIP discussion
>>>>> again
>>>>>>> for
>>>>>>>>> your reviews.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wangguoz@gmail.com
>>>>>>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Patrik,
>>>>>>>>>>
>>>>>>>>>> Since we are rolling out 2.3 and everyone is busy with the
>>>>> release
>>>>>>> now
>>>>>>>>>> this KIP does not have much discussion involved yet and will
>>>>> slip
>>>>>>> into
>>>>>>>> the
>>>>>>>>>> next release cadence.
>>>>>>>>>>
>>>>>>>>>> This KIP itself contains several parts itself: 1. refactoring
>>>>> the
>>>>>>>> existing
>>>>>>>>>> metrics hierarchy to cleanup some redundancy and also get more
>>>>>>>> clarity; 2.
>>>>>>>>>> add instance-level metrics like rebalance and state metrics, as
>>>>>> well
>>>>>>> as
>>>>>>>>>> other static metrics.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <
>>>>> pkleindl@gmail.com
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Guozhang
>>>>>>>>>>> Thanks for the KIP, this looks very helpful.
>>>>>>>>>>> Could you please provide more detail on the metrics planned for
>>>>>> the
>>>>>>>> state?
>>>>>>>>>>> We were just considering how to implement this ourselves
>>>>> because
>>>>>> we
>>>>>>>> need
>>>>>>>>>>> to
>>>>>>>>>>> track the history of stage changes.
>>>>>>>>>>> The idea was to have an accumulated "seconds in state x" metric
>>>>>> for
>>>>>>>> every
>>>>>>>>>>> state.
>>>>>>>>>>> The new rebalance metric might solve part of our use case, but
>>>>> it
>>>>>> is
>>>>>>>>>>> interesting what you have planned for the state metric.
>>>>>>>>>>> best regards
>>>>>>>>>>> Patrik
>>>>>>>>>>>
>>>>>>>>>>> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <
>>>>> wangguoz@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>
>>>>>>>>>>>> I'd like to propose the following KIP to improve the Kafka
>>>>>> Streams
>>>>>>>>>>> metrics
>>>>>>>>>>>> mechanism to users. This includes 1) a minor change in the
>>>>>> public
>>>>>>>>>>>> StreamsMetrics API, and 2) a major cleanup on the Streams'
>>>>> own
>>>>>>>> built-in
>>>>>>>>>>>> metrics hierarchy.
>>>>>>>>>>>>
>>>>>>>>>>>> Details can be found here:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
>>>>>>>>>>>>
>>>>>>>>>>>> I'd love to hear your thoughts and feedbacks. Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>>
>>
>>
> 


Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Matthias,

That's a good question. Thinking about a bit more, I'd like to propose that
we just list all the version numbers since 0.10 to 2.4 as accepted values,
and let Streams to decide if old / new set of metrics can be used
internally (implementation wise we can reuse the const values for
`upgrade.from` as well).

And then in the future when we remove the metrics, we can just remove the
corresponding version values from the accepted list of this config.


Guozhang

On Tue, Jul 23, 2019 at 11:55 AM Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for the KIP Guozhang.
>
> I just re-read the wiki page and the DISCUSS thread. Overall LGTM.
>
> The only nit is the naming of the new config values. With AK 2.3 being
> released the versions numbers needs to be updated.
>
> Additionally, I actually think that "2.2-" and "2.3" are not the best
> names: the `-` suffix is very subtle IMHO and actually looks more like a
> typo, and it might be better to be more elaborate. Maybe something like
> "up-to-2.2" ?
>
> For "2.3", this config value would be weird for future releases (ie,
> 2.4, 2.5, 2.6). Hence, we might want to rename it to "newest" /
> "current" or something like this?
>
> Another alternative may be to rename it to "since-2.3" (or similar) --
> however, this may require to rename the config if we change metrics in a
> future release (hence, it's not my preferred option).
>
> Thoughts?
>
>
> -Matthias
>
> On 7/22/19 6:33 PM, Guozhang Wang wrote:
> > Thanks everyone for your inputs, I've updated the wiki page accordingly.
> >
> > @Bruno: please let me know if you have any further thoughts per my
> replies
> > above.
> >
> >
> > Guozhang
> >
> >
> > On Mon, Jul 22, 2019 at 6:30 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Thanks Boyang,
> >>
> >> I've thought about exposing time via metrics in Streams. The tricky part
> >> though is which layer of time we should expose: right now we have
> >> task-level and partition-level stream time (what you suggested), and
> also
> >> some processor internally maintain their own observed time. Today we are
> >> still trying to get a clear and simple way of exposing a single time
> >> concept for users to reason about their application's progress. So
> before
> >> we come up with a good solution I'd postpone adding it in a future KIP.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Thu, Jul 18, 2019 at 1:21 PM Boyang Chen <reluctanthero104@gmail.com
> >
> >> wrote:
> >>
> >>> I mean the partition time.
> >>>
> >>> On Thu, Jul 18, 2019 at 11:29 AM Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Boyang,
> >>>>
> >>>> What do you mean by `per partition latency`?
> >>>>
> >>>> Guozhang
> >>>>
> >>>> On Mon, Jul 1, 2019 at 9:28 AM Boyang Chen <
> reluctanthero104@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hey Guozhang,
> >>>>>
> >>>>> do we plan to add per partition latency in this KIP?
> >>>>>
> >>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
> >>> wrote:
> >>>>>
> >>>>>> Hi Guozhang,
> >>>>>>
> >>>>>> Thank you for the KIP.
> >>>>>>
> >>>>>> 1) As far as I understand, the StreamsMetrics interface is there for
> >>>>>> user-defined processors. Would it make sense to also add a method to
> >>>>>> the interface to specify a sensor that records skipped records?
> >>>>>>
> >>>>>> 2) What are the semantics of active-task-process and
> >>>> standby-task-process
> >>>>>>
> >>>>>> 3) How do dropped-late-records and expired-window-record-drop relate
> >>>>>> to each other? I guess the former is for records that fall outside
> >>> the
> >>>>>> grace period and the latter is for records that are processed after
> >>>>>> the retention period of the window. Is this correct?
> >>>>>>
> >>>>>> 4) Is there an actual difference between skipped and dropped
> >>> records?
> >>>>>> If not, shall we unify the terminology?
> >>>>>>
> >>>>>> 5) What happens with removed metrics when the user sets the version
> >>> of
> >>>>>> "built.in.metrics.version" to 2.2-
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>> Hello folks,
> >>>>>>>
> >>>>>>> As 2.3 is released now, I'd like to bump up this KIP discussion
> >>> again
> >>>>> for
> >>>>>>> your reviews.
> >>>>>>>
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wangguoz@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hello Patrik,
> >>>>>>>>
> >>>>>>>> Since we are rolling out 2.3 and everyone is busy with the
> >>> release
> >>>>> now
> >>>>>>>> this KIP does not have much discussion involved yet and will
> >>> slip
> >>>>> into
> >>>>>> the
> >>>>>>>> next release cadence.
> >>>>>>>>
> >>>>>>>> This KIP itself contains several parts itself: 1. refactoring
> >>> the
> >>>>>> existing
> >>>>>>>> metrics hierarchy to cleanup some redundancy and also get more
> >>>>>> clarity; 2.
> >>>>>>>> add instance-level metrics like rebalance and state metrics, as
> >>>> well
> >>>>> as
> >>>>>>>> other static metrics.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <
> >>> pkleindl@gmail.com
> >>>>>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Guozhang
> >>>>>>>>> Thanks for the KIP, this looks very helpful.
> >>>>>>>>> Could you please provide more detail on the metrics planned for
> >>>> the
> >>>>>> state?
> >>>>>>>>> We were just considering how to implement this ourselves
> >>> because
> >>>> we
> >>>>>> need
> >>>>>>>>> to
> >>>>>>>>> track the history of stage changes.
> >>>>>>>>> The idea was to have an accumulated "seconds in state x" metric
> >>>> for
> >>>>>> every
> >>>>>>>>> state.
> >>>>>>>>> The new rebalance metric might solve part of our use case, but
> >>> it
> >>>> is
> >>>>>>>>> interesting what you have planned for the state metric.
> >>>>>>>>> best regards
> >>>>>>>>> Patrik
> >>>>>>>>>
> >>>>>>>>> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <
> >>> wangguoz@gmail.com>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hello folks,
> >>>>>>>>>>
> >>>>>>>>>> I'd like to propose the following KIP to improve the Kafka
> >>>> Streams
> >>>>>>>>> metrics
> >>>>>>>>>> mechanism to users. This includes 1) a minor change in the
> >>>> public
> >>>>>>>>>> StreamsMetrics API, and 2) a major cleanup on the Streams'
> >>> own
> >>>>>> built-in
> >>>>>>>>>> metrics hierarchy.
> >>>>>>>>>>
> >>>>>>>>>> Details can be found here:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> >>>>>>>>>>
> >>>>>>>>>> I'd love to hear your thoughts and feedbacks. Thanks!
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>
>

-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for the KIP Guozhang.

I just re-read the wiki page and the DISCUSS thread. Overall LGTM.

The only nit is the naming of the new config values. With AK 2.3 being
released the versions numbers needs to be updated.

Additionally, I actually think that "2.2-" and "2.3" are not the best
names: the `-` suffix is very subtle IMHO and actually looks more like a
typo, and it might be better to be more elaborate. Maybe something like
"up-to-2.2" ?

For "2.3", this config value would be weird for future releases (ie,
2.4, 2.5, 2.6). Hence, we might want to rename it to "newest" /
"current" or something like this?

Another alternative may be to rename it to "since-2.3" (or similar) --
however, this may require to rename the config if we change metrics in a
future release (hence, it's not my preferred option).

Thoughts?


-Matthias

On 7/22/19 6:33 PM, Guozhang Wang wrote:
> Thanks everyone for your inputs, I've updated the wiki page accordingly.
> 
> @Bruno: please let me know if you have any further thoughts per my replies
> above.
> 
> 
> Guozhang
> 
> 
> On Mon, Jul 22, 2019 at 6:30 PM Guozhang Wang <wa...@gmail.com> wrote:
> 
>> Thanks Boyang,
>>
>> I've thought about exposing time via metrics in Streams. The tricky part
>> though is which layer of time we should expose: right now we have
>> task-level and partition-level stream time (what you suggested), and also
>> some processor internally maintain their own observed time. Today we are
>> still trying to get a clear and simple way of exposing a single time
>> concept for users to reason about their application's progress. So before
>> we come up with a good solution I'd postpone adding it in a future KIP.
>>
>>
>> Guozhang
>>
>>
>> On Thu, Jul 18, 2019 at 1:21 PM Boyang Chen <re...@gmail.com>
>> wrote:
>>
>>> I mean the partition time.
>>>
>>> On Thu, Jul 18, 2019 at 11:29 AM Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>>
>>>> Hi Boyang,
>>>>
>>>> What do you mean by `per partition latency`?
>>>>
>>>> Guozhang
>>>>
>>>> On Mon, Jul 1, 2019 at 9:28 AM Boyang Chen <re...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey Guozhang,
>>>>>
>>>>> do we plan to add per partition latency in this KIP?
>>>>>
>>>>> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
>>> wrote:
>>>>>
>>>>>> Hi Guozhang,
>>>>>>
>>>>>> Thank you for the KIP.
>>>>>>
>>>>>> 1) As far as I understand, the StreamsMetrics interface is there for
>>>>>> user-defined processors. Would it make sense to also add a method to
>>>>>> the interface to specify a sensor that records skipped records?
>>>>>>
>>>>>> 2) What are the semantics of active-task-process and
>>>> standby-task-process
>>>>>>
>>>>>> 3) How do dropped-late-records and expired-window-record-drop relate
>>>>>> to each other? I guess the former is for records that fall outside
>>> the
>>>>>> grace period and the latter is for records that are processed after
>>>>>> the retention period of the window. Is this correct?
>>>>>>
>>>>>> 4) Is there an actual difference between skipped and dropped
>>> records?
>>>>>> If not, shall we unify the terminology?
>>>>>>
>>>>>> 5) What happens with removed metrics when the user sets the version
>>> of
>>>>>> "built.in.metrics.version" to 2.2-
>>>>>>
>>>>>> Best,
>>>>>> Bruno
>>>>>>
>>>>>> On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com>
>>>>> wrote:
>>>>>>>
>>>>>>> Hello folks,
>>>>>>>
>>>>>>> As 2.3 is released now, I'd like to bump up this KIP discussion
>>> again
>>>>> for
>>>>>>> your reviews.
>>>>>>>
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wangguoz@gmail.com
>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Patrik,
>>>>>>>>
>>>>>>>> Since we are rolling out 2.3 and everyone is busy with the
>>> release
>>>>> now
>>>>>>>> this KIP does not have much discussion involved yet and will
>>> slip
>>>>> into
>>>>>> the
>>>>>>>> next release cadence.
>>>>>>>>
>>>>>>>> This KIP itself contains several parts itself: 1. refactoring
>>> the
>>>>>> existing
>>>>>>>> metrics hierarchy to cleanup some redundancy and also get more
>>>>>> clarity; 2.
>>>>>>>> add instance-level metrics like rebalance and state metrics, as
>>>> well
>>>>> as
>>>>>>>> other static metrics.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <
>>> pkleindl@gmail.com
>>>>>
>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Guozhang
>>>>>>>>> Thanks for the KIP, this looks very helpful.
>>>>>>>>> Could you please provide more detail on the metrics planned for
>>>> the
>>>>>> state?
>>>>>>>>> We were just considering how to implement this ourselves
>>> because
>>>> we
>>>>>> need
>>>>>>>>> to
>>>>>>>>> track the history of stage changes.
>>>>>>>>> The idea was to have an accumulated "seconds in state x" metric
>>>> for
>>>>>> every
>>>>>>>>> state.
>>>>>>>>> The new rebalance metric might solve part of our use case, but
>>> it
>>>> is
>>>>>>>>> interesting what you have planned for the state metric.
>>>>>>>>> best regards
>>>>>>>>> Patrik
>>>>>>>>>
>>>>>>>>> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <
>>> wangguoz@gmail.com>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello folks,
>>>>>>>>>>
>>>>>>>>>> I'd like to propose the following KIP to improve the Kafka
>>>> Streams
>>>>>>>>> metrics
>>>>>>>>>> mechanism to users. This includes 1) a minor change in the
>>>> public
>>>>>>>>>> StreamsMetrics API, and 2) a major cleanup on the Streams'
>>> own
>>>>>> built-in
>>>>>>>>>> metrics hierarchy.
>>>>>>>>>>
>>>>>>>>>> Details can be found here:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
>>>>>>>>>>
>>>>>>>>>> I'd love to hear your thoughts and feedbacks. Thanks!
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
>> --
>> -- Guozhang
>>
> 
> 


Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks everyone for your inputs, I've updated the wiki page accordingly.

@Bruno: please let me know if you have any further thoughts per my replies
above.


Guozhang


On Mon, Jul 22, 2019 at 6:30 PM Guozhang Wang <wa...@gmail.com> wrote:

> Thanks Boyang,
>
> I've thought about exposing time via metrics in Streams. The tricky part
> though is which layer of time we should expose: right now we have
> task-level and partition-level stream time (what you suggested), and also
> some processor internally maintain their own observed time. Today we are
> still trying to get a clear and simple way of exposing a single time
> concept for users to reason about their application's progress. So before
> we come up with a good solution I'd postpone adding it in a future KIP.
>
>
> Guozhang
>
>
> On Thu, Jul 18, 2019 at 1:21 PM Boyang Chen <re...@gmail.com>
> wrote:
>
>> I mean the partition time.
>>
>> On Thu, Jul 18, 2019 at 11:29 AM Guozhang Wang <wa...@gmail.com>
>> wrote:
>>
>> > Hi Boyang,
>> >
>> > What do you mean by `per partition latency`?
>> >
>> > Guozhang
>> >
>> > On Mon, Jul 1, 2019 at 9:28 AM Boyang Chen <re...@gmail.com>
>> > wrote:
>> >
>> > > Hey Guozhang,
>> > >
>> > > do we plan to add per partition latency in this KIP?
>> > >
>> > > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
>> wrote:
>> > >
>> > > > Hi Guozhang,
>> > > >
>> > > > Thank you for the KIP.
>> > > >
>> > > > 1) As far as I understand, the StreamsMetrics interface is there for
>> > > > user-defined processors. Would it make sense to also add a method to
>> > > > the interface to specify a sensor that records skipped records?
>> > > >
>> > > > 2) What are the semantics of active-task-process and
>> > standby-task-process
>> > > >
>> > > > 3) How do dropped-late-records and expired-window-record-drop relate
>> > > > to each other? I guess the former is for records that fall outside
>> the
>> > > > grace period and the latter is for records that are processed after
>> > > > the retention period of the window. Is this correct?
>> > > >
>> > > > 4) Is there an actual difference between skipped and dropped
>> records?
>> > > > If not, shall we unify the terminology?
>> > > >
>> > > > 5) What happens with removed metrics when the user sets the version
>> of
>> > > > "built.in.metrics.version" to 2.2-
>> > > >
>> > > > Best,
>> > > > Bruno
>> > > >
>> > > > On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > Hello folks,
>> > > > >
>> > > > > As 2.3 is released now, I'd like to bump up this KIP discussion
>> again
>> > > for
>> > > > > your reviews.
>> > > > >
>> > > > >
>> > > > > Guozhang
>> > > > >
>> > > > >
>> > > > > On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wangguoz@gmail.com
>> >
>> > > > wrote:
>> > > > >
>> > > > > > Hello Patrik,
>> > > > > >
>> > > > > > Since we are rolling out 2.3 and everyone is busy with the
>> release
>> > > now
>> > > > > > this KIP does not have much discussion involved yet and will
>> slip
>> > > into
>> > > > the
>> > > > > > next release cadence.
>> > > > > >
>> > > > > > This KIP itself contains several parts itself: 1. refactoring
>> the
>> > > > existing
>> > > > > > metrics hierarchy to cleanup some redundancy and also get more
>> > > > clarity; 2.
>> > > > > > add instance-level metrics like rebalance and state metrics, as
>> > well
>> > > as
>> > > > > > other static metrics.
>> > > > > >
>> > > > > >
>> > > > > > Guozhang
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <
>> pkleindl@gmail.com
>> > >
>> > > > wrote:
>> > > > > >
>> > > > > >> Hi Guozhang
>> > > > > >> Thanks for the KIP, this looks very helpful.
>> > > > > >> Could you please provide more detail on the metrics planned for
>> > the
>> > > > state?
>> > > > > >> We were just considering how to implement this ourselves
>> because
>> > we
>> > > > need
>> > > > > >> to
>> > > > > >> track the history of stage changes.
>> > > > > >> The idea was to have an accumulated "seconds in state x" metric
>> > for
>> > > > every
>> > > > > >> state.
>> > > > > >> The new rebalance metric might solve part of our use case, but
>> it
>> > is
>> > > > > >> interesting what you have planned for the state metric.
>> > > > > >> best regards
>> > > > > >> Patrik
>> > > > > >>
>> > > > > >> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <
>> wangguoz@gmail.com>
>> > > > wrote:
>> > > > > >>
>> > > > > >> > Hello folks,
>> > > > > >> >
>> > > > > >> > I'd like to propose the following KIP to improve the Kafka
>> > Streams
>> > > > > >> metrics
>> > > > > >> > mechanism to users. This includes 1) a minor change in the
>> > public
>> > > > > >> > StreamsMetrics API, and 2) a major cleanup on the Streams'
>> own
>> > > > built-in
>> > > > > >> > metrics hierarchy.
>> > > > > >> >
>> > > > > >> > Details can be found here:
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >
>> > > > > >>
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
>> > > > > >> >
>> > > > > >> > I'd love to hear your thoughts and feedbacks. Thanks!
>> > > > > >> >
>> > > > > >> > --
>> > > > > >> > -- Guozhang
>> > > > > >> >
>> > > > > >>
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > -- Guozhang
>> > > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > -- Guozhang
>> > > >
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Boyang,

I've thought about exposing time via metrics in Streams. The tricky part
though is which layer of time we should expose: right now we have
task-level and partition-level stream time (what you suggested), and also
some processor internally maintain their own observed time. Today we are
still trying to get a clear and simple way of exposing a single time
concept for users to reason about their application's progress. So before
we come up with a good solution I'd postpone adding it in a future KIP.


Guozhang


On Thu, Jul 18, 2019 at 1:21 PM Boyang Chen <re...@gmail.com>
wrote:

> I mean the partition time.
>
> On Thu, Jul 18, 2019 at 11:29 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Boyang,
> >
> > What do you mean by `per partition latency`?
> >
> > Guozhang
> >
> > On Mon, Jul 1, 2019 at 9:28 AM Boyang Chen <re...@gmail.com>
> > wrote:
> >
> > > Hey Guozhang,
> > >
> > > do we plan to add per partition latency in this KIP?
> > >
> > > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io>
> wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Thank you for the KIP.
> > > >
> > > > 1) As far as I understand, the StreamsMetrics interface is there for
> > > > user-defined processors. Would it make sense to also add a method to
> > > > the interface to specify a sensor that records skipped records?
> > > >
> > > > 2) What are the semantics of active-task-process and
> > standby-task-process
> > > >
> > > > 3) How do dropped-late-records and expired-window-record-drop relate
> > > > to each other? I guess the former is for records that fall outside
> the
> > > > grace period and the latter is for records that are processed after
> > > > the retention period of the window. Is this correct?
> > > >
> > > > 4) Is there an actual difference between skipped and dropped records?
> > > > If not, shall we unify the terminology?
> > > >
> > > > 5) What happens with removed metrics when the user sets the version
> of
> > > > "built.in.metrics.version" to 2.2-
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > > >
> > > > > Hello folks,
> > > > >
> > > > > As 2.3 is released now, I'd like to bump up this KIP discussion
> again
> > > for
> > > > > your reviews.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello Patrik,
> > > > > >
> > > > > > Since we are rolling out 2.3 and everyone is busy with the
> release
> > > now
> > > > > > this KIP does not have much discussion involved yet and will slip
> > > into
> > > > the
> > > > > > next release cadence.
> > > > > >
> > > > > > This KIP itself contains several parts itself: 1. refactoring the
> > > > existing
> > > > > > metrics hierarchy to cleanup some redundancy and also get more
> > > > clarity; 2.
> > > > > > add instance-level metrics like rebalance and state metrics, as
> > well
> > > as
> > > > > > other static metrics.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <
> pkleindl@gmail.com
> > >
> > > > wrote:
> > > > > >
> > > > > >> Hi Guozhang
> > > > > >> Thanks for the KIP, this looks very helpful.
> > > > > >> Could you please provide more detail on the metrics planned for
> > the
> > > > state?
> > > > > >> We were just considering how to implement this ourselves because
> > we
> > > > need
> > > > > >> to
> > > > > >> track the history of stage changes.
> > > > > >> The idea was to have an accumulated "seconds in state x" metric
> > for
> > > > every
> > > > > >> state.
> > > > > >> The new rebalance metric might solve part of our use case, but
> it
> > is
> > > > > >> interesting what you have planned for the state metric.
> > > > > >> best regards
> > > > > >> Patrik
> > > > > >>
> > > > > >> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > > >>
> > > > > >> > Hello folks,
> > > > > >> >
> > > > > >> > I'd like to propose the following KIP to improve the Kafka
> > Streams
> > > > > >> metrics
> > > > > >> > mechanism to users. This includes 1) a minor change in the
> > public
> > > > > >> > StreamsMetrics API, and 2) a major cleanup on the Streams' own
> > > > built-in
> > > > > >> > metrics hierarchy.
> > > > > >> >
> > > > > >> > Details can be found here:
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> > > > > >> >
> > > > > >> > I'd love to hear your thoughts and feedbacks. Thanks!
> > > > > >> >
> > > > > >> > --
> > > > > >> > -- Guozhang
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Boyang Chen <re...@gmail.com>.
I mean the partition time.

On Thu, Jul 18, 2019 at 11:29 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Boyang,
>
> What do you mean by `per partition latency`?
>
> Guozhang
>
> On Mon, Jul 1, 2019 at 9:28 AM Boyang Chen <re...@gmail.com>
> wrote:
>
> > Hey Guozhang,
> >
> > do we plan to add per partition latency in this KIP?
> >
> > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io> wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thank you for the KIP.
> > >
> > > 1) As far as I understand, the StreamsMetrics interface is there for
> > > user-defined processors. Would it make sense to also add a method to
> > > the interface to specify a sensor that records skipped records?
> > >
> > > 2) What are the semantics of active-task-process and
> standby-task-process
> > >
> > > 3) How do dropped-late-records and expired-window-record-drop relate
> > > to each other? I guess the former is for records that fall outside the
> > > grace period and the latter is for records that are processed after
> > > the retention period of the window. Is this correct?
> > >
> > > 4) Is there an actual difference between skipped and dropped records?
> > > If not, shall we unify the terminology?
> > >
> > > 5) What happens with removed metrics when the user sets the version of
> > > "built.in.metrics.version" to 2.2-
> > >
> > > Best,
> > > Bruno
> > >
> > > On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > > >
> > > > Hello folks,
> > > >
> > > > As 2.3 is released now, I'd like to bump up this KIP discussion again
> > for
> > > > your reviews.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Patrik,
> > > > >
> > > > > Since we are rolling out 2.3 and everyone is busy with the release
> > now
> > > > > this KIP does not have much discussion involved yet and will slip
> > into
> > > the
> > > > > next release cadence.
> > > > >
> > > > > This KIP itself contains several parts itself: 1. refactoring the
> > > existing
> > > > > metrics hierarchy to cleanup some redundancy and also get more
> > > clarity; 2.
> > > > > add instance-level metrics like rebalance and state metrics, as
> well
> > as
> > > > > other static metrics.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <pkleindl@gmail.com
> >
> > > wrote:
> > > > >
> > > > >> Hi Guozhang
> > > > >> Thanks for the KIP, this looks very helpful.
> > > > >> Could you please provide more detail on the metrics planned for
> the
> > > state?
> > > > >> We were just considering how to implement this ourselves because
> we
> > > need
> > > > >> to
> > > > >> track the history of stage changes.
> > > > >> The idea was to have an accumulated "seconds in state x" metric
> for
> > > every
> > > > >> state.
> > > > >> The new rebalance metric might solve part of our use case, but it
> is
> > > > >> interesting what you have planned for the state metric.
> > > > >> best regards
> > > > >> Patrik
> > > > >>
> > > > >> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > > >>
> > > > >> > Hello folks,
> > > > >> >
> > > > >> > I'd like to propose the following KIP to improve the Kafka
> Streams
> > > > >> metrics
> > > > >> > mechanism to users. This includes 1) a minor change in the
> public
> > > > >> > StreamsMetrics API, and 2) a major cleanup on the Streams' own
> > > built-in
> > > > >> > metrics hierarchy.
> > > > >> >
> > > > >> > Details can be found here:
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> > > > >> >
> > > > >> > I'd love to hear your thoughts and feedbacks. Thanks!
> > > > >> >
> > > > >> > --
> > > > >> > -- Guozhang
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Boyang,

What do you mean by `per partition latency`?

Guozhang

On Mon, Jul 1, 2019 at 9:28 AM Boyang Chen <re...@gmail.com>
wrote:

> Hey Guozhang,
>
> do we plan to add per partition latency in this KIP?
>
> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io> wrote:
>
> > Hi Guozhang,
> >
> > Thank you for the KIP.
> >
> > 1) As far as I understand, the StreamsMetrics interface is there for
> > user-defined processors. Would it make sense to also add a method to
> > the interface to specify a sensor that records skipped records?
> >
> > 2) What are the semantics of active-task-process and standby-task-process
> >
> > 3) How do dropped-late-records and expired-window-record-drop relate
> > to each other? I guess the former is for records that fall outside the
> > grace period and the latter is for records that are processed after
> > the retention period of the window. Is this correct?
> >
> > 4) Is there an actual difference between skipped and dropped records?
> > If not, shall we unify the terminology?
> >
> > 5) What happens with removed metrics when the user sets the version of
> > "built.in.metrics.version" to 2.2-
> >
> > Best,
> > Bruno
> >
> > On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> > >
> > > Hello folks,
> > >
> > > As 2.3 is released now, I'd like to bump up this KIP discussion again
> for
> > > your reviews.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hello Patrik,
> > > >
> > > > Since we are rolling out 2.3 and everyone is busy with the release
> now
> > > > this KIP does not have much discussion involved yet and will slip
> into
> > the
> > > > next release cadence.
> > > >
> > > > This KIP itself contains several parts itself: 1. refactoring the
> > existing
> > > > metrics hierarchy to cleanup some redundancy and also get more
> > clarity; 2.
> > > > add instance-level metrics like rebalance and state metrics, as well
> as
> > > > other static metrics.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <pk...@gmail.com>
> > wrote:
> > > >
> > > >> Hi Guozhang
> > > >> Thanks for the KIP, this looks very helpful.
> > > >> Could you please provide more detail on the metrics planned for the
> > state?
> > > >> We were just considering how to implement this ourselves because we
> > need
> > > >> to
> > > >> track the history of stage changes.
> > > >> The idea was to have an accumulated "seconds in state x" metric for
> > every
> > > >> state.
> > > >> The new rebalance metric might solve part of our use case, but it is
> > > >> interesting what you have planned for the state metric.
> > > >> best regards
> > > >> Patrik
> > > >>
> > > >> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > > >>
> > > >> > Hello folks,
> > > >> >
> > > >> > I'd like to propose the following KIP to improve the Kafka Streams
> > > >> metrics
> > > >> > mechanism to users. This includes 1) a minor change in the public
> > > >> > StreamsMetrics API, and 2) a major cleanup on the Streams' own
> > built-in
> > > >> > metrics hierarchy.
> > > >> >
> > > >> > Details can be found here:
> > > >> >
> > > >> >
> > > >> >
> > > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> > > >> >
> > > >> > I'd love to hear your thoughts and feedbacks. Thanks!
> > > >> >
> > > >> > --
> > > >> > -- Guozhang
> > > >> >
> > > >>
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Boyang Chen <re...@gmail.com>.
Hey Guozhang,

do we plan to add per partition latency in this KIP?

On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Guozhang,
>
> Thank you for the KIP.
>
> 1) As far as I understand, the StreamsMetrics interface is there for
> user-defined processors. Would it make sense to also add a method to
> the interface to specify a sensor that records skipped records?
>
> 2) What are the semantics of active-task-process and standby-task-process
>
> 3) How do dropped-late-records and expired-window-record-drop relate
> to each other? I guess the former is for records that fall outside the
> grace period and the latter is for records that are processed after
> the retention period of the window. Is this correct?
>
> 4) Is there an actual difference between skipped and dropped records?
> If not, shall we unify the terminology?
>
> 5) What happens with removed metrics when the user sets the version of
> "built.in.metrics.version" to 2.2-
>
> Best,
> Bruno
>
> On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > Hello folks,
> >
> > As 2.3 is released now, I'd like to bump up this KIP discussion again for
> > your reviews.
> >
> >
> > Guozhang
> >
> >
> > On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Patrik,
> > >
> > > Since we are rolling out 2.3 and everyone is busy with the release now
> > > this KIP does not have much discussion involved yet and will slip into
> the
> > > next release cadence.
> > >
> > > This KIP itself contains several parts itself: 1. refactoring the
> existing
> > > metrics hierarchy to cleanup some redundancy and also get more
> clarity; 2.
> > > add instance-level metrics like rebalance and state metrics, as well as
> > > other static metrics.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <pk...@gmail.com>
> wrote:
> > >
> > >> Hi Guozhang
> > >> Thanks for the KIP, this looks very helpful.
> > >> Could you please provide more detail on the metrics planned for the
> state?
> > >> We were just considering how to implement this ourselves because we
> need
> > >> to
> > >> track the history of stage changes.
> > >> The idea was to have an accumulated "seconds in state x" metric for
> every
> > >> state.
> > >> The new rebalance metric might solve part of our use case, but it is
> > >> interesting what you have planned for the state metric.
> > >> best regards
> > >> Patrik
> > >>
> > >> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <wa...@gmail.com>
> wrote:
> > >>
> > >> > Hello folks,
> > >> >
> > >> > I'd like to propose the following KIP to improve the Kafka Streams
> > >> metrics
> > >> > mechanism to users. This includes 1) a minor change in the public
> > >> > StreamsMetrics API, and 2) a major cleanup on the Streams' own
> built-in
> > >> > metrics hierarchy.
> > >> >
> > >> > Details can be found here:
> > >> >
> > >> >
> > >> >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> > >> >
> > >> > I'd love to hear your thoughts and feedbacks. Thanks!
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >> >
> > >>
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> > -- Guozhang
>

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Guozhang,

Thank you for the KIP.

1) As far as I understand, the StreamsMetrics interface is there for
user-defined processors. Would it make sense to also add a method to
the interface to specify a sensor that records skipped records?

2) What are the semantics of active-task-process and standby-task-process

3) How do dropped-late-records and expired-window-record-drop relate
to each other? I guess the former is for records that fall outside the
grace period and the latter is for records that are processed after
the retention period of the window. Is this correct?

4) Is there an actual difference between skipped and dropped records?
If not, shall we unify the terminology?

5) What happens with removed metrics when the user sets the version of
"built.in.metrics.version" to 2.2-

Best,
Bruno

On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> Hello folks,
>
> As 2.3 is released now, I'd like to bump up this KIP discussion again for
> your reviews.
>
>
> Guozhang
>
>
> On Thu, May 23, 2019 at 4:44 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Patrik,
> >
> > Since we are rolling out 2.3 and everyone is busy with the release now
> > this KIP does not have much discussion involved yet and will slip into the
> > next release cadence.
> >
> > This KIP itself contains several parts itself: 1. refactoring the existing
> > metrics hierarchy to cleanup some redundancy and also get more clarity; 2.
> > add instance-level metrics like rebalance and state metrics, as well as
> > other static metrics.
> >
> >
> > Guozhang
> >
> >
> >
> > On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <pk...@gmail.com> wrote:
> >
> >> Hi Guozhang
> >> Thanks for the KIP, this looks very helpful.
> >> Could you please provide more detail on the metrics planned for the state?
> >> We were just considering how to implement this ourselves because we need
> >> to
> >> track the history of stage changes.
> >> The idea was to have an accumulated "seconds in state x" metric for every
> >> state.
> >> The new rebalance metric might solve part of our use case, but it is
> >> interesting what you have planned for the state metric.
> >> best regards
> >> Patrik
> >>
> >> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <wa...@gmail.com> wrote:
> >>
> >> > Hello folks,
> >> >
> >> > I'd like to propose the following KIP to improve the Kafka Streams
> >> metrics
> >> > mechanism to users. This includes 1) a minor change in the public
> >> > StreamsMetrics API, and 2) a major cleanup on the Streams' own built-in
> >> > metrics hierarchy.
> >> >
> >> > Details can be found here:
> >> >
> >> >
> >> >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> >> >
> >> > I'd love to hear your thoughts and feedbacks. Thanks!
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
> -- Guozhang