You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Walker Carlson <wc...@confluent.io> on 2021/02/22 22:59:21 UTC

[DISCUSS] KIP-715: Expose Committed offset in streams

Hello all,

I would like to start discussion on KIP-715. This kip aims to make it
easier to monitor Kafka Streams progress by exposing the committed offset
in a similar way as the consumer client does.

Here is the KIP: https://cwiki.apache.org/confluence/x/aRRRCg

Best,
Walker

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by "Matthias J. Sax" <mj...@apache.org>.
Thanks for the KIP!

I personally think, that it might be sufficient to just report offsets
of assigned tasks. Similar to metrics what are also reported only
locally, users can roll-up/aggregate the information across instances
manually.

What I also don't understand is, what "idling" means?


-Matthias

On 2/22/21 3:11 PM, Boyang Chen wrote:
> Thanks Walker for the proposed KIP! This should definitely empower KStream
> users with better visibility.
> 
> Meanwhile I got a couple of questions/suggestions:
> 
> 
> 1. typo "repost/report" in the motivation section.
> 
> 2. What offsets do we report when the task is under restoration or
> rebalancing?
> 
> 3. IIUC, we should clearly state that our reported metrics are based off
> locally assigned tasks for each instance.
> 
> 4. In the meantime, what’s our strategy to report tasks that are not local
> to the instance? Users would normally try to monitor all the possible
> tasks, and it’s unfortunate we couldn’t determine whether we have lost
> tasks. My brainstorming was whether it makes sense for the leader instance
> to report the task progress as -1 for all “supposed to be running” tasks,
> so that on the metrics collector side it could catch any missing tasks.
> 
> 5. It seems not clear how users should use `isTaskIdling`. Why not report a
> map/set for idling tasks just as what we did for committed offsets?
> 
> 6. Why do we use TopicPartition instead of TaskId as the key in the
> returned map?
> 7. Could we include some details in where we got the commit offsets for
> each task? Is it through consumer offset fetch, or the stream processing
> progress based on the records fetched?
> 
> 
> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <wc...@confluent.io>
> wrote:
> 
>> Hello all,
>>
>> I would like to start discussion on KIP-715. This kip aims to make it
>> easier to monitor Kafka Streams progress by exposing the committed offset
>> in a similar way as the consumer client does.
>>
>> Here is the KIP: https://cwiki.apache.org/confluence/x/aRRRCg
>>
>> Best,
>> Walker
>>
> 

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by "Matthias J. Sax" <mj...@apache.org>.
I think calling it endOffset is still fine.

We should keep it "simple" for users and not introduce too many concepts.


-Matthias

On 3/2/21 8:14 AM, Walker Carlson wrote:
> Okay we can document that if the state is rebalancing that a Task could be
> between instances and so no show up for one localThreadMetadata call. but
> this should not cause a problem for repeated calls
> 
> Bruno, to your questions. The endOffset is like the consumer's
> highWatermark and does not require a remote call. It seems his name is
> confusing and I should change the name from endOffset to HighWatermark to
> match the consumer.
> 
> walker
> 
> On Tue, Mar 2, 2021 at 4:43 AM Bruno Cadonna <br...@confluent.io> wrote:
> 
>> Hi Walker,
>>
>> Thank you for the KIP!
>>
>> I somehow agree that we should document that some tasks may be missing.
>>
>> I have one question/comment. As far as I understand, your KIP adds two
>> methods that return data that is actually hosted on the brokers, namely
>> committedOffsets() and endOffsets(). Thus, we need a remote call to
>> fetch the data and consequently the cost of calling
>> localThreadMetaData() might increase substantially. I understand, that
>> for committedOffsets(), we could avoid the remote call by maintaining
>> the committedOffsets() locally, but can we also avoid the remote call
>> for endOffsets()? Should we allow users to pass a parameter to
>> localThreadMetaData() that skips the metadata that needs remote calls to
>> keep the costs for use cases that do not need the end offsets low?
>>
>> Best,
>> Bruno
>>
>> On 02.03.21 02:18, Matthias J. Sax wrote:
>>>> but the user should
>>>> not rely on all tasks being returned at any given time to begin with
>> since
>>>> it's possible we are in between revoking and re-assigning a partition.
>>>
>>> Exactly. That is what I meant: the "hand off" phase of partitions during
>>> a rebalance. During this phase, some tasks are "missing" if you
>>> aggregate the information globally. My point was (even if it might be
>>> obvious to us) that it seems to be worth pointing out in the KIPs and in
>>> the docs.
>>>
>>> I meant "partial information" from a global POV (not partial for a
>>> single local instance).
>>>
>>>> Also I mention that they return the highest value they had seen
>>>> so far for any tasks they have assigned to them.
>>>
>>> For the shutdown case maybe, but after a task is closed its metadata
>>> should not be returned any longer IMHO.
>>>
>>>
>>> -Matthias
>>>
>>> On 3/1/21 4:46 PM, Walker Carlson wrote:
>>>> I updated to use Optional, good idea Mathias.
>>>>
>>>> For the localThreadMetadata, it could already be called running a
>>>> rebalance. Also I mention that they return the highest value they had
>> seen
>>>> so far for any tasks they have assigned to them. I thought it would be
>>>> useful to see the TaskMetadata while the Threads were shutting down. I
>>>> think that there shouldn't really be partial information. If you think
>> this
>>>> should be clarified better let me know.
>>>>
>>>> walker
>>>>
>>>> On Mon, Mar 1, 2021 at 3:45 PM Sophie Blee-Goldman <sophie@confluent.io
>>>
>>>> wrote:
>>>>
>>>>> Can you clarify your second question Matthias? If this is queried
>> during
>>>>> a cooperative rebalance, it should return the tasks as usual. If the
>> user
>>>>> is
>>>>> using eager rebalancing then this will not return any tasks, but the
>> user
>>>>> should
>>>>> not rely on all tasks being returned at any given time to begin with
>> since
>>>>> it's
>>>>> possible we are in between revoking and re-assigning a partition.
>>>>>
>>>>> What does "partial information" mean?
>>>>>
>>>>> (btw I agree that an Optional makes sense for
>> timeCurrentIdlingStarted())
>>>>>
>>>>> On Mon, Mar 1, 2021 at 11:46 AM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>>>
>>>>>> Thanks the updating the KIP Walker.
>>>>>>
>>>>>> About, `timeCurrentIdlingStarted()`: should we return an `Optional`
>>>>>> instead of `-1` if a task is not idling.
>>>>>>
>>>>>>
>>>>>> As we allow to call `localThreadMetadata()` any time, could it be that
>>>>>> we report partial information during a rebalance? If yes, this should
>> be
>>>>>> pointed out, because if one want to implement a health check this
>> needs
>>>>>> to be taken into account.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 2/27/21 11:32 AM, Walker Carlson wrote:
>>>>>>> Sure thing Boyang,
>>>>>>>
>>>>>>> 1) it is in proposed changes. I expanded on it a bit more now.
>>>>>>> 2) done
>>>>>>> 3) and done :)
>>>>>>>
>>>>>>> thanks for the suggestions,
>>>>>>> walker
>>>>>>>
>>>>>>> On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <
>>>>> reluctanthero104@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Walker. Some minor comments:
>>>>>>>>
>>>>>>>> 1. Could you add a reference to localThreadMetadata method in the
>> KIP?
>>>>>>>> 2. Could you make the code block as a java template, such that
>>>>>>>> TaskMetadata.java could be as the template title? Also it would be
>>>>> good
>>>>>> to
>>>>>>>> add some meta comments about the newly added functions.
>>>>>>>> 3. Could you write more details about rejected alternatives? Just as
>>>>>> why we
>>>>>>>> don't choose to expose as metrics, and how a new method on KStream
>> is
>>>>>> not
>>>>>>>> favorable. These would be valuable when we look back on our design
>>>>>>>> decisions.
>>>>>>>>
>>>>>>>> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <
>>>>> wcarlson@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I understand now. I think that is a valid concern but I think it is
>>>>>> best
>>>>>>>>> solved but having an external service verify through streams. As
>> this
>>>>>> KIP
>>>>>>>>> is now just adding fields to TaskMetadata to be returned in the
>>>>>>>>> threadMetadata I am going to say that is out of scope.
>>>>>>>>>
>>>>>>>>> That seems to be the last concern. If there are no others I will
>> put
>>>>>> this
>>>>>>>>> up for a vote soon.
>>>>>>>>>
>>>>>>>>> walker
>>>>>>>>>
>>>>>>>>> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <
>>>>>> reluctanthero104@gmail.com
>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> For the 3rd point, yes, what I'm proposing is an edge case. For
>>>>>>>> example,
>>>>>>>>>> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in
>> rebalancing
>>>>>>>> logic
>>>>>>>>>> causing no one gets 1_1 assigned. Then the health check service
>> will
>>>>>>>> only
>>>>>>>>>> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
>>>>>>>> paying
>>>>>>>>>> attention to 1_1. What I want to expose is a "logical global" view
>>>>> of
>>>>>>>> all
>>>>>>>>>> the tasks through the stream instance, since each instance gets
>> the
>>>>>>>>>> assigned topology and should be able to infer all the exact tasks
>> to
>>>>>> be
>>>>>>>>> up
>>>>>>>>>> and running when the service is healthy.
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <
>>>>>> wcarlson@confluent.io
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for the follow up Boyang and Guozhang,
>>>>>>>>>>>
>>>>>>>>>>> I have updated the kip to include these ideas.
>>>>>>>>>>>
>>>>>>>>>>> Guozhang, that is a good idea about using the TaskMetadata. We
>> can
>>>>>>>> get
>>>>>>>>> it
>>>>>>>>>>> through the ThreadMetadata with a minor change to
>>>>>>>> `localThreadMetadata`
>>>>>>>>>> in
>>>>>>>>>>> kafkaStreams. This means that we will only need to update
>>>>>>>> TaskMetadata
>>>>>>>>>> and
>>>>>>>>>>> add no other APIs
>>>>>>>>>>>
>>>>>>>>>>> Boyang, since each TaskMetadata contains the TaskId and
>>>>>>>>> TopicPartitions I
>>>>>>>>>>> don't believe mapping either way will be a problem. Also I think
>> we
>>>>>>>> can
>>>>>>>>>> do
>>>>>>>>>>> something like record the time the task started idling and when
>> it
>>>>>>>>> stops
>>>>>>>>>>> idling we can override it to -1. I think that should clear up the
>>>>>>>> first
>>>>>>>>>> two
>>>>>>>>>>> points.
>>>>>>>>>>>
>>>>>>>>>>> As for your third point I am not sure I 100% understand. The
>>>>>>>>>> ThreadMetadata
>>>>>>>>>>> will contain a set of all task assigned to that thread. Any
>> health
>>>>>>>>> check
>>>>>>>>>>> service will just need to query all clients and aggregate their
>>>>>>>>> responses
>>>>>>>>>>> to get a complete picture of all tasks correct?
>>>>>>>>>>>
>>>>>>>>>>> walker
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <
>> wangguoz@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Regarding the second API and the `TaskStatus` class: I'd suggest
>>>>> we
>>>>>>>>>>>> consolidate on the existing `TaskMetadata` since we have already
>>>>>>>>>>>> accumulated a bunch of such classes, and its better to keep them
>>>>>>>>> small
>>>>>>>>>> as
>>>>>>>>>>>> public APIs. You can see
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12370
>>>>>>>>>>>> for a reference and a proposal.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
>>>>>>>>>> reluctanthero104@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the updates Walker. Some replies and follow-up
>>>>>>>>> questions:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. I agree one task could have multiple partitions, but when we
>>>>>>>>> hit a
>>>>>>>>>>>> delay
>>>>>>>>>>>>> in terms of offset progress, do we have a convenient way to
>>>>>>>> reverse
>>>>>>>>>>>> mapping
>>>>>>>>>>>>> TopicPartition to the problematic task? In production, I
>> believe
>>>>>>>> it
>>>>>>>>>>> would
>>>>>>>>>>>>> be much quicker to identify the problem using task.id instead
>> of
>>>>>>>>>> topic
>>>>>>>>>>>>> partition, especially when it points to an internal topic. I
>>>>>>>> think
>>>>>>>>>>> having
>>>>>>>>>>>>> the task id as part of the entry value seems useful, which
>> means
>>>>>>>>>>> getting
>>>>>>>>>>>>> something like Map<TopicPartition, TaskProgress> where
>>>>>>>> TaskProgress
>>>>>>>>>>>>> contains both committed offsets & task id.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. The task idling API was still confusing. I don't think we
>> care
>>>>>>>>>> about
>>>>>>>>>>>> the
>>>>>>>>>>>>> exact state when making tasksIdling()query, instead we care
>> more
>>>>>>>>>> about
>>>>>>>>>>>> how
>>>>>>>>>>>>> long one task has been in idle state since when you called,
>> which
>>>>>>>>>>>> reflects
>>>>>>>>>>>>> whether it is a normal idling period. So I feel it might be
>>>>>>>> helpful
>>>>>>>>>> to
>>>>>>>>>>>>> track that time difference and report it in the TaskStatus
>>>>>>>> struct.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3. What I want to achieve to have some global mapping of either
>>>>>>>>>>>>> TopicPartition or TaskId was that it is not possible for a
>> health
>>>>>>>>>> check
>>>>>>>>>>>>> service to report a task failure that doesn't emit any metrics.
>>>>>>>> So
>>>>>>>>> as
>>>>>>>>>>>> long
>>>>>>>>>>>>> as we have a global topic partition API, health check could
>>>>>>>> always
>>>>>>>>> be
>>>>>>>>>>>> aware
>>>>>>>>>>>>> of any task/partition not reporting its progress, does that
>> make
>>>>>>>>>> sense?
>>>>>>>>>>>> If
>>>>>>>>>>>>> you feel we have a better way to achieve this, such as querying
>>>>>>>> all
>>>>>>>>>> the
>>>>>>>>>>>>> input/intermediate topic metadata directly from Kafka for the
>>>>>>>>>>> baseline, I
>>>>>>>>>>>>> think that should be good as well and worth mentioning it in
>> the
>>>>>>>>> KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also it seems that the KIP hasn't reflected what you proposed
>> for
>>>>>>>>> the
>>>>>>>>>>>> task
>>>>>>>>>>>>> idling status.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Boyang
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
>>>>>>>>>> wcarlson@confluent.io>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you for the comments everyone!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think there are a few things I can clear up in general then
>> I
>>>>>>>>>> will
>>>>>>>>>>>>>> specifically respond to each question.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> First, when I say "idling" I refer to task idling. Where the
>>>>>>>>> stream
>>>>>>>>>>> is
>>>>>>>>>>>>>> intentionally not making progress. (
>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10091 is an
>>>>>>>>> example).
>>>>>>>>>>> This
>>>>>>>>>>>>>> becomes relevant if a task is waiting on one partition with no
>>>>>>>>> data
>>>>>>>>>>> but
>>>>>>>>>>>>>> that is holding up a partition with data. That would cause one
>>>>>>>>> just
>>>>>>>>>>>>> looking
>>>>>>>>>>>>>> at the committed offset changes to believe the task has a
>>>>>>>> problem
>>>>>>>>>>> when
>>>>>>>>>>>> it
>>>>>>>>>>>>>> is working as intended.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In light of this confusion. I plan to change tasksIdling() to
>>>>>>>>>>>>> `Map<TaskId,
>>>>>>>>>>>>>> TaskStatus> getTasksStatus()` this should hopefully make it
>>>>>>>> more
>>>>>>>>>>> clear
>>>>>>>>>>>>> what
>>>>>>>>>>>>>> is being exposed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> TaskStatus would include: TopicPartions, TaskId,
>>>>>>>>> ProcessorTopology,
>>>>>>>>>>>>> Idling,
>>>>>>>>>>>>>> and State.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Boyang:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2) I think that each task should report on whatever
>>>>>>>>> TopicPartitions
>>>>>>>>>>>> they
>>>>>>>>>>>>>> hold, this means a Topic Partition might get reported twice
>> but
>>>>>>>>> the
>>>>>>>>>>>> user
>>>>>>>>>>>>>> can roll those up and use the larger one when looking at the
>>>>>>>>> whole
>>>>>>>>>>> app.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 4) If the user collects the committed offsets across all the
>>>>>>>>>> running
>>>>>>>>>>>>>> clients there shouldn't be any tasks missing correct?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 6) Because there is not a 1:1 mapping between Tasks and
>>>>>>>>>>>> TopicPartitions I
>>>>>>>>>>>>>> think it is cleaner to report them separately.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1) Yes, that was my original plan but it made more sense to
>>>>>>>>> mirror
>>>>>>>>>>> how
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> consumer exposes the committed offset.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 3) That is a good point. I think that we should include
>>>>>>>> internal
>>>>>>>>>>> topics
>>>>>>>>>>>>> as
>>>>>>>>>>>>>> well. I think that if the topology were to evolve there should
>>>>>>>> be
>>>>>>>>>>> fair
>>>>>>>>>>>>>> warning anyways. Maybe you can clarify what would be limited
>> by
>>>>>>>>>>>> exposing
>>>>>>>>>>>>>> the interior topics here? I thought a user could find them in
>>>>>>>>> other
>>>>>>>>>>>> ways.
>>>>>>>>>>>>>> If it is the name we could aynomise them before exposing them.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you all for your comments. If I did not respond directly
>>>>>>>> to
>>>>>>>>>> one
>>>>>>>>>>>> of
>>>>>>>>>>>>>> your questions I updated the kip to include the details it was
>>>>>>>>>>>>> requesting.
>>>>>>>>>>>>>> I didn't not include my proposed changes mentioned earlier as
>> I
>>>>>>>>>> would
>>>>>>>>>>>>> like
>>>>>>>>>>>>>> to get some feedback about what to include in TaskStatus and
>> in
>>>>>>>>>>>> general.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> best,
>>>>>>>>>>>>>> Walker
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
>>>>>>>>> wangguoz@gmail.com
>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello Walker, thanks for the KIP. A few thoughts:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1) Have you considered just relying on the
>>>>>>>>>> `KafkaStreams#metrics()`
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> includes embedded consumer metrics that have the committed
>>>>>>>>>> offsets
>>>>>>>>>>>>>>> instead of adding a new API? Not advocating that this is a
>>>>>>>>> better
>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>> but want to make sure we considered all options before we
>>>>>>>> come
>>>>>>>>> to
>>>>>>>>>>> the
>>>>>>>>>>>>>> "last
>>>>>>>>>>>>>>> resort" of adding new public interfaces.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2) The javadoc mentions "tasks assigned to this client", but
>>>>>>>>> the
>>>>>>>>>>>>> returned
>>>>>>>>>>>>>>> map is on partitions. I think we should make the javadoc and
>>>>>>>>> the
>>>>>>>>>>>> return
>>>>>>>>>>>>>>> types consistent, either tasks or topic partitions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 3) In addition, if for 2) above we ended up with topic
>>>>>>>>>> partitions,
>>>>>>>>>>>> then
>>>>>>>>>>>>>>> would they include only external source topics, or also
>>>>>>>>> including
>>>>>>>>>>>>>> internal
>>>>>>>>>>>>>>> repartition / changelog topics? I think including only
>>>>>>>> external
>>>>>>>>>>>> source
>>>>>>>>>>>>>>> topic partitions are not sufficient for your goal of tracking
>>>>>>>>>>>> progress,
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>> exposing internal topic names are also a big commitment here
>>>>>>>>> for
>>>>>>>>>>>> future
>>>>>>>>>>>>>>> topology evolution.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 4)  For "tasksIdling", I'm wondering if we can make it more
>>>>>>>>>>> general,
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> the returned value is not just a boolean, but a TaskState
>>>>>>>> that
>>>>>>>>>> can
>>>>>>>>>>> be
>>>>>>>>>>>>> an
>>>>>>>>>>>>>>> enum of "created, restoring, running, idle, closing". This
>>>>>>>>> could
>>>>>>>>>>> help
>>>>>>>>>>>>> us
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> the future to track other things like restoration efficiency
>>>>>>>>> and
>>>>>>>>>>>>>> rebalance
>>>>>>>>>>>>>>> efficiency etc.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 5) We need to clarify how is "idling" being defined here:
>>>>>>>> e.g.
>>>>>>>>> we
>>>>>>>>>>> can
>>>>>>>>>>>>>>> clearly state that a task is considered idle only if 1) lag
>>>>>>>> is
>>>>>>>>>>>>>>> increasing, indicating that there are indeed new records
>>>>>>>>> arrived
>>>>>>>>>> at
>>>>>>>>>>>>>> source,
>>>>>>>>>>>>>>> while committed offset is not advancing, AND 2) produced
>>>>>>>> offset
>>>>>>>>>>>>> (imagine
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> may have punctuations that generate new data to the output
>>>>>>>>> topic
>>>>>>>>>>> even
>>>>>>>>>>>>> if
>>>>>>>>>>>>>>> there's no input for a while) is not advancing either.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
>>>>>>>>>>>>> reluctanthero104@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks Walker for the proposed KIP! This should definitely
>>>>>>>>>>> empower
>>>>>>>>>>>>>>> KStream
>>>>>>>>>>>>>>>> users with better visibility.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Meanwhile I got a couple of questions/suggestions:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1. typo "repost/report" in the motivation section.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2. What offsets do we report when the task is under
>>>>>>>>> restoration
>>>>>>>>>>> or
>>>>>>>>>>>>>>>> rebalancing?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 3. IIUC, we should clearly state that our reported metrics
>>>>>>>>> are
>>>>>>>>>>>> based
>>>>>>>>>>>>>> off
>>>>>>>>>>>>>>>> locally assigned tasks for each instance.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 4. In the meantime, what’s our strategy to report tasks
>>>>>>>> that
>>>>>>>>>> are
>>>>>>>>>>>> not
>>>>>>>>>>>>>>> local
>>>>>>>>>>>>>>>> to the instance? Users would normally try to monitor all
>>>>>>>> the
>>>>>>>>>>>> possible
>>>>>>>>>>>>>>>> tasks, and it’s unfortunate we couldn’t determine whether
>>>>>>>> we
>>>>>>>>>> have
>>>>>>>>>>>>> lost
>>>>>>>>>>>>>>>> tasks. My brainstorming was whether it makes sense for the
>>>>>>>>>> leader
>>>>>>>>>>>>>>> instance
>>>>>>>>>>>>>>>> to report the task progress as -1 for all “supposed to be
>>>>>>>>>>> running”
>>>>>>>>>>>>>> tasks,
>>>>>>>>>>>>>>>> so that on the metrics collector side it could catch any
>>>>>>>>>> missing
>>>>>>>>>>>>> tasks.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 5. It seems not clear how users should use `isTaskIdling`.
>>>>>>>>> Why
>>>>>>>>>>> not
>>>>>>>>>>>>>>> report a
>>>>>>>>>>>>>>>> map/set for idling tasks just as what we did for committed
>>>>>>>>>>> offsets?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 6. Why do we use TopicPartition instead of TaskId as the
>>>>>>>> key
>>>>>>>>> in
>>>>>>>>>>> the
>>>>>>>>>>>>>>>> returned map?
>>>>>>>>>>>>>>>> 7. Could we include some details in where we got the commit
>>>>>>>>>>> offsets
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> each task? Is it through consumer offset fetch, or the
>>>>>>>> stream
>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>> progress based on the records fetched?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
>>>>>>>>>>>>> wcarlson@confluent.io>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I would like to start discussion on KIP-715. This kip
>>>>>>>> aims
>>>>>>>>> to
>>>>>>>>>>>> make
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> easier to monitor Kafka Streams progress by exposing the
>>>>>>>>>>>> committed
>>>>>>>>>>>>>>> offset
>>>>>>>>>>>>>>>>> in a similar way as the consumer client does.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Here is the KIP:
>>>>>>>>>> https://cwiki.apache.org/confluence/x/aRRRCg
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Walker
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
> 

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Walker Carlson <wc...@confluent.io>.
Okay we can document that if the state is rebalancing that a Task could be
between instances and so no show up for one localThreadMetadata call. but
this should not cause a problem for repeated calls

Bruno, to your questions. The endOffset is like the consumer's
highWatermark and does not require a remote call. It seems his name is
confusing and I should change the name from endOffset to HighWatermark to
match the consumer.

walker

On Tue, Mar 2, 2021 at 4:43 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Walker,
>
> Thank you for the KIP!
>
> I somehow agree that we should document that some tasks may be missing.
>
> I have one question/comment. As far as I understand, your KIP adds two
> methods that return data that is actually hosted on the brokers, namely
> committedOffsets() and endOffsets(). Thus, we need a remote call to
> fetch the data and consequently the cost of calling
> localThreadMetaData() might increase substantially. I understand, that
> for committedOffsets(), we could avoid the remote call by maintaining
> the committedOffsets() locally, but can we also avoid the remote call
> for endOffsets()? Should we allow users to pass a parameter to
> localThreadMetaData() that skips the metadata that needs remote calls to
> keep the costs for use cases that do not need the end offsets low?
>
> Best,
> Bruno
>
> On 02.03.21 02:18, Matthias J. Sax wrote:
> >> but the user should
> >> not rely on all tasks being returned at any given time to begin with
> since
> >> it's possible we are in between revoking and re-assigning a partition.
> >
> > Exactly. That is what I meant: the "hand off" phase of partitions during
> > a rebalance. During this phase, some tasks are "missing" if you
> > aggregate the information globally. My point was (even if it might be
> > obvious to us) that it seems to be worth pointing out in the KIPs and in
> > the docs.
> >
> > I meant "partial information" from a global POV (not partial for a
> > single local instance).
> >
> >> Also I mention that they return the highest value they had seen
> >> so far for any tasks they have assigned to them.
> >
> > For the shutdown case maybe, but after a task is closed its metadata
> > should not be returned any longer IMHO.
> >
> >
> > -Matthias
> >
> > On 3/1/21 4:46 PM, Walker Carlson wrote:
> >> I updated to use Optional, good idea Mathias.
> >>
> >> For the localThreadMetadata, it could already be called running a
> >> rebalance. Also I mention that they return the highest value they had
> seen
> >> so far for any tasks they have assigned to them. I thought it would be
> >> useful to see the TaskMetadata while the Threads were shutting down. I
> >> think that there shouldn't really be partial information. If you think
> this
> >> should be clarified better let me know.
> >>
> >> walker
> >>
> >> On Mon, Mar 1, 2021 at 3:45 PM Sophie Blee-Goldman <sophie@confluent.io
> >
> >> wrote:
> >>
> >>> Can you clarify your second question Matthias? If this is queried
> during
> >>> a cooperative rebalance, it should return the tasks as usual. If the
> user
> >>> is
> >>> using eager rebalancing then this will not return any tasks, but the
> user
> >>> should
> >>> not rely on all tasks being returned at any given time to begin with
> since
> >>> it's
> >>> possible we are in between revoking and re-assigning a partition.
> >>>
> >>> What does "partial information" mean?
> >>>
> >>> (btw I agree that an Optional makes sense for
> timeCurrentIdlingStarted())
> >>>
> >>> On Mon, Mar 1, 2021 at 11:46 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>>
> >>>> Thanks the updating the KIP Walker.
> >>>>
> >>>> About, `timeCurrentIdlingStarted()`: should we return an `Optional`
> >>>> instead of `-1` if a task is not idling.
> >>>>
> >>>>
> >>>> As we allow to call `localThreadMetadata()` any time, could it be that
> >>>> we report partial information during a rebalance? If yes, this should
> be
> >>>> pointed out, because if one want to implement a health check this
> needs
> >>>> to be taken into account.
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 2/27/21 11:32 AM, Walker Carlson wrote:
> >>>>> Sure thing Boyang,
> >>>>>
> >>>>> 1) it is in proposed changes. I expanded on it a bit more now.
> >>>>> 2) done
> >>>>> 3) and done :)
> >>>>>
> >>>>> thanks for the suggestions,
> >>>>> walker
> >>>>>
> >>>>> On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <
> >>> reluctanthero104@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Thanks Walker. Some minor comments:
> >>>>>>
> >>>>>> 1. Could you add a reference to localThreadMetadata method in the
> KIP?
> >>>>>> 2. Could you make the code block as a java template, such that
> >>>>>> TaskMetadata.java could be as the template title? Also it would be
> >>> good
> >>>> to
> >>>>>> add some meta comments about the newly added functions.
> >>>>>> 3. Could you write more details about rejected alternatives? Just as
> >>>> why we
> >>>>>> don't choose to expose as metrics, and how a new method on KStream
> is
> >>>> not
> >>>>>> favorable. These would be valuable when we look back on our design
> >>>>>> decisions.
> >>>>>>
> >>>>>> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <
> >>> wcarlson@confluent.io>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> I understand now. I think that is a valid concern but I think it is
> >>>> best
> >>>>>>> solved but having an external service verify through streams. As
> this
> >>>> KIP
> >>>>>>> is now just adding fields to TaskMetadata to be returned in the
> >>>>>>> threadMetadata I am going to say that is out of scope.
> >>>>>>>
> >>>>>>> That seems to be the last concern. If there are no others I will
> put
> >>>> this
> >>>>>>> up for a vote soon.
> >>>>>>>
> >>>>>>> walker
> >>>>>>>
> >>>>>>> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <
> >>>> reluctanthero104@gmail.com
> >>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> For the 3rd point, yes, what I'm proposing is an edge case. For
> >>>>>> example,
> >>>>>>>> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in
> rebalancing
> >>>>>> logic
> >>>>>>>> causing no one gets 1_1 assigned. Then the health check service
> will
> >>>>>> only
> >>>>>>>> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
> >>>>>> paying
> >>>>>>>> attention to 1_1. What I want to expose is a "logical global" view
> >>> of
> >>>>>> all
> >>>>>>>> the tasks through the stream instance, since each instance gets
> the
> >>>>>>>> assigned topology and should be able to infer all the exact tasks
> to
> >>>> be
> >>>>>>> up
> >>>>>>>> and running when the service is healthy.
> >>>>>>>>
> >>>>>>>> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <
> >>>> wcarlson@confluent.io
> >>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks for the follow up Boyang and Guozhang,
> >>>>>>>>>
> >>>>>>>>> I have updated the kip to include these ideas.
> >>>>>>>>>
> >>>>>>>>> Guozhang, that is a good idea about using the TaskMetadata. We
> can
> >>>>>> get
> >>>>>>> it
> >>>>>>>>> through the ThreadMetadata with a minor change to
> >>>>>> `localThreadMetadata`
> >>>>>>>> in
> >>>>>>>>> kafkaStreams. This means that we will only need to update
> >>>>>> TaskMetadata
> >>>>>>>> and
> >>>>>>>>> add no other APIs
> >>>>>>>>>
> >>>>>>>>> Boyang, since each TaskMetadata contains the TaskId and
> >>>>>>> TopicPartitions I
> >>>>>>>>> don't believe mapping either way will be a problem. Also I think
> we
> >>>>>> can
> >>>>>>>> do
> >>>>>>>>> something like record the time the task started idling and when
> it
> >>>>>>> stops
> >>>>>>>>> idling we can override it to -1. I think that should clear up the
> >>>>>> first
> >>>>>>>> two
> >>>>>>>>> points.
> >>>>>>>>>
> >>>>>>>>> As for your third point I am not sure I 100% understand. The
> >>>>>>>> ThreadMetadata
> >>>>>>>>> will contain a set of all task assigned to that thread. Any
> health
> >>>>>>> check
> >>>>>>>>> service will just need to query all clients and aggregate their
> >>>>>>> responses
> >>>>>>>>> to get a complete picture of all tasks correct?
> >>>>>>>>>
> >>>>>>>>> walker
> >>>>>>>>>
> >>>>>>>>> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <
> wangguoz@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Regarding the second API and the `TaskStatus` class: I'd suggest
> >>> we
> >>>>>>>>>> consolidate on the existing `TaskMetadata` since we have already
> >>>>>>>>>> accumulated a bunch of such classes, and its better to keep them
> >>>>>>> small
> >>>>>>>> as
> >>>>>>>>>> public APIs. You can see
> >>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12370
> >>>>>>>>>> for a reference and a proposal.
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
> >>>>>>>> reluctanthero104@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thanks for the updates Walker. Some replies and follow-up
> >>>>>>> questions:
> >>>>>>>>>>>
> >>>>>>>>>>> 1. I agree one task could have multiple partitions, but when we
> >>>>>>> hit a
> >>>>>>>>>> delay
> >>>>>>>>>>> in terms of offset progress, do we have a convenient way to
> >>>>>> reverse
> >>>>>>>>>> mapping
> >>>>>>>>>>> TopicPartition to the problematic task? In production, I
> believe
> >>>>>> it
> >>>>>>>>> would
> >>>>>>>>>>> be much quicker to identify the problem using task.id instead
> of
> >>>>>>>> topic
> >>>>>>>>>>> partition, especially when it points to an internal topic. I
> >>>>>> think
> >>>>>>>>> having
> >>>>>>>>>>> the task id as part of the entry value seems useful, which
> means
> >>>>>>>>> getting
> >>>>>>>>>>> something like Map<TopicPartition, TaskProgress> where
> >>>>>> TaskProgress
> >>>>>>>>>>> contains both committed offsets & task id.
> >>>>>>>>>>>
> >>>>>>>>>>> 2. The task idling API was still confusing. I don't think we
> care
> >>>>>>>> about
> >>>>>>>>>> the
> >>>>>>>>>>> exact state when making tasksIdling()query, instead we care
> more
> >>>>>>>> about
> >>>>>>>>>> how
> >>>>>>>>>>> long one task has been in idle state since when you called,
> which
> >>>>>>>>>> reflects
> >>>>>>>>>>> whether it is a normal idling period. So I feel it might be
> >>>>>> helpful
> >>>>>>>> to
> >>>>>>>>>>> track that time difference and report it in the TaskStatus
> >>>>>> struct.
> >>>>>>>>>>>
> >>>>>>>>>>> 3. What I want to achieve to have some global mapping of either
> >>>>>>>>>>> TopicPartition or TaskId was that it is not possible for a
> health
> >>>>>>>> check
> >>>>>>>>>>> service to report a task failure that doesn't emit any metrics.
> >>>>>> So
> >>>>>>> as
> >>>>>>>>>> long
> >>>>>>>>>>> as we have a global topic partition API, health check could
> >>>>>> always
> >>>>>>> be
> >>>>>>>>>> aware
> >>>>>>>>>>> of any task/partition not reporting its progress, does that
> make
> >>>>>>>> sense?
> >>>>>>>>>> If
> >>>>>>>>>>> you feel we have a better way to achieve this, such as querying
> >>>>>> all
> >>>>>>>> the
> >>>>>>>>>>> input/intermediate topic metadata directly from Kafka for the
> >>>>>>>>> baseline, I
> >>>>>>>>>>> think that should be good as well and worth mentioning it in
> the
> >>>>>>> KIP.
> >>>>>>>>>>>
> >>>>>>>>>>> Also it seems that the KIP hasn't reflected what you proposed
> for
> >>>>>>> the
> >>>>>>>>>> task
> >>>>>>>>>>> idling status.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Boyang
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
> >>>>>>>> wcarlson@confluent.io>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Thank you for the comments everyone!
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think there are a few things I can clear up in general then
> I
> >>>>>>>> will
> >>>>>>>>>>>> specifically respond to each question.
> >>>>>>>>>>>>
> >>>>>>>>>>>> First, when I say "idling" I refer to task idling. Where the
> >>>>>>> stream
> >>>>>>>>> is
> >>>>>>>>>>>> intentionally not making progress. (
> >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10091 is an
> >>>>>>> example).
> >>>>>>>>> This
> >>>>>>>>>>>> becomes relevant if a task is waiting on one partition with no
> >>>>>>> data
> >>>>>>>>> but
> >>>>>>>>>>>> that is holding up a partition with data. That would cause one
> >>>>>>> just
> >>>>>>>>>>> looking
> >>>>>>>>>>>> at the committed offset changes to believe the task has a
> >>>>>> problem
> >>>>>>>>> when
> >>>>>>>>>> it
> >>>>>>>>>>>> is working as intended.
> >>>>>>>>>>>>
> >>>>>>>>>>>> In light of this confusion. I plan to change tasksIdling() to
> >>>>>>>>>>> `Map<TaskId,
> >>>>>>>>>>>> TaskStatus> getTasksStatus()` this should hopefully make it
> >>>>>> more
> >>>>>>>>> clear
> >>>>>>>>>>> what
> >>>>>>>>>>>> is being exposed.
> >>>>>>>>>>>>
> >>>>>>>>>>>> TaskStatus would include: TopicPartions, TaskId,
> >>>>>>> ProcessorTopology,
> >>>>>>>>>>> Idling,
> >>>>>>>>>>>> and State.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Boyang:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2) I think that each task should report on whatever
> >>>>>>> TopicPartitions
> >>>>>>>>>> they
> >>>>>>>>>>>> hold, this means a Topic Partition might get reported twice
> but
> >>>>>>> the
> >>>>>>>>>> user
> >>>>>>>>>>>> can roll those up and use the larger one when looking at the
> >>>>>>> whole
> >>>>>>>>> app.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 4) If the user collects the committed offsets across all the
> >>>>>>>> running
> >>>>>>>>>>>> clients there shouldn't be any tasks missing correct?
> >>>>>>>>>>>>
> >>>>>>>>>>>> 6) Because there is not a 1:1 mapping between Tasks and
> >>>>>>>>>> TopicPartitions I
> >>>>>>>>>>>> think it is cleaner to report them separately.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Guozhang:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1) Yes, that was my original plan but it made more sense to
> >>>>>>> mirror
> >>>>>>>>> how
> >>>>>>>>>>> the
> >>>>>>>>>>>> consumer exposes the committed offset.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3) That is a good point. I think that we should include
> >>>>>> internal
> >>>>>>>>> topics
> >>>>>>>>>>> as
> >>>>>>>>>>>> well. I think that if the topology were to evolve there should
> >>>>>> be
> >>>>>>>>> fair
> >>>>>>>>>>>> warning anyways. Maybe you can clarify what would be limited
> by
> >>>>>>>>>> exposing
> >>>>>>>>>>>> the interior topics here? I thought a user could find them in
> >>>>>>> other
> >>>>>>>>>> ways.
> >>>>>>>>>>>> If it is the name we could aynomise them before exposing them.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thank you all for your comments. If I did not respond directly
> >>>>>> to
> >>>>>>>> one
> >>>>>>>>>> of
> >>>>>>>>>>>> your questions I updated the kip to include the details it was
> >>>>>>>>>>> requesting.
> >>>>>>>>>>>> I didn't not include my proposed changes mentioned earlier as
> I
> >>>>>>>> would
> >>>>>>>>>>> like
> >>>>>>>>>>>> to get some feedback about what to include in TaskStatus and
> in
> >>>>>>>>>> general.
> >>>>>>>>>>>>
> >>>>>>>>>>>> best,
> >>>>>>>>>>>> Walker
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
> >>>>>>> wangguoz@gmail.com
> >>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hello Walker, thanks for the KIP. A few thoughts:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) Have you considered just relying on the
> >>>>>>>> `KafkaStreams#metrics()`
> >>>>>>>>>>> that
> >>>>>>>>>>>>> includes embedded consumer metrics that have the committed
> >>>>>>>> offsets
> >>>>>>>>>>>>> instead of adding a new API? Not advocating that this is a
> >>>>>>> better
> >>>>>>>>>>>> approach
> >>>>>>>>>>>>> but want to make sure we considered all options before we
> >>>>>> come
> >>>>>>> to
> >>>>>>>>> the
> >>>>>>>>>>>> "last
> >>>>>>>>>>>>> resort" of adding new public interfaces.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2) The javadoc mentions "tasks assigned to this client", but
> >>>>>>> the
> >>>>>>>>>>> returned
> >>>>>>>>>>>>> map is on partitions. I think we should make the javadoc and
> >>>>>>> the
> >>>>>>>>>> return
> >>>>>>>>>>>>> types consistent, either tasks or topic partitions.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3) In addition, if for 2) above we ended up with topic
> >>>>>>>> partitions,
> >>>>>>>>>> then
> >>>>>>>>>>>>> would they include only external source topics, or also
> >>>>>>> including
> >>>>>>>>>>>> internal
> >>>>>>>>>>>>> repartition / changelog topics? I think including only
> >>>>>> external
> >>>>>>>>>> source
> >>>>>>>>>>>>> topic partitions are not sufficient for your goal of tracking
> >>>>>>>>>> progress,
> >>>>>>>>>>>> but
> >>>>>>>>>>>>> exposing internal topic names are also a big commitment here
> >>>>>>> for
> >>>>>>>>>> future
> >>>>>>>>>>>>> topology evolution.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 4)  For "tasksIdling", I'm wondering if we can make it more
> >>>>>>>>> general,
> >>>>>>>>>>> that
> >>>>>>>>>>>>> the returned value is not just a boolean, but a TaskState
> >>>>>> that
> >>>>>>>> can
> >>>>>>>>> be
> >>>>>>>>>>> an
> >>>>>>>>>>>>> enum of "created, restoring, running, idle, closing". This
> >>>>>>> could
> >>>>>>>>> help
> >>>>>>>>>>> us
> >>>>>>>>>>>> in
> >>>>>>>>>>>>> the future to track other things like restoration efficiency
> >>>>>>> and
> >>>>>>>>>>>> rebalance
> >>>>>>>>>>>>> efficiency etc.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 5) We need to clarify how is "idling" being defined here:
> >>>>>> e.g.
> >>>>>>> we
> >>>>>>>>> can
> >>>>>>>>>>>>> clearly state that a task is considered idle only if 1) lag
> >>>>>> is
> >>>>>>>>>>>>> increasing, indicating that there are indeed new records
> >>>>>>> arrived
> >>>>>>>> at
> >>>>>>>>>>>> source,
> >>>>>>>>>>>>> while committed offset is not advancing, AND 2) produced
> >>>>>> offset
> >>>>>>>>>>> (imagine
> >>>>>>>>>>>> we
> >>>>>>>>>>>>> may have punctuations that generate new data to the output
> >>>>>>> topic
> >>>>>>>>> even
> >>>>>>>>>>> if
> >>>>>>>>>>>>> there's no input for a while) is not advancing either.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> >>>>>>>>>>> reluctanthero104@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks Walker for the proposed KIP! This should definitely
> >>>>>>>>> empower
> >>>>>>>>>>>>> KStream
> >>>>>>>>>>>>>> users with better visibility.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Meanwhile I got a couple of questions/suggestions:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. typo "repost/report" in the motivation section.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2. What offsets do we report when the task is under
> >>>>>>> restoration
> >>>>>>>>> or
> >>>>>>>>>>>>>> rebalancing?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 3. IIUC, we should clearly state that our reported metrics
> >>>>>>> are
> >>>>>>>>>> based
> >>>>>>>>>>>> off
> >>>>>>>>>>>>>> locally assigned tasks for each instance.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 4. In the meantime, what’s our strategy to report tasks
> >>>>>> that
> >>>>>>>> are
> >>>>>>>>>> not
> >>>>>>>>>>>>> local
> >>>>>>>>>>>>>> to the instance? Users would normally try to monitor all
> >>>>>> the
> >>>>>>>>>> possible
> >>>>>>>>>>>>>> tasks, and it’s unfortunate we couldn’t determine whether
> >>>>>> we
> >>>>>>>> have
> >>>>>>>>>>> lost
> >>>>>>>>>>>>>> tasks. My brainstorming was whether it makes sense for the
> >>>>>>>> leader
> >>>>>>>>>>>>> instance
> >>>>>>>>>>>>>> to report the task progress as -1 for all “supposed to be
> >>>>>>>>> running”
> >>>>>>>>>>>> tasks,
> >>>>>>>>>>>>>> so that on the metrics collector side it could catch any
> >>>>>>>> missing
> >>>>>>>>>>> tasks.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 5. It seems not clear how users should use `isTaskIdling`.
> >>>>>>> Why
> >>>>>>>>> not
> >>>>>>>>>>>>> report a
> >>>>>>>>>>>>>> map/set for idling tasks just as what we did for committed
> >>>>>>>>> offsets?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 6. Why do we use TopicPartition instead of TaskId as the
> >>>>>> key
> >>>>>>> in
> >>>>>>>>> the
> >>>>>>>>>>>>>> returned map?
> >>>>>>>>>>>>>> 7. Could we include some details in where we got the commit
> >>>>>>>>> offsets
> >>>>>>>>>>> for
> >>>>>>>>>>>>>> each task? Is it through consumer offset fetch, or the
> >>>>>> stream
> >>>>>>>>>>>> processing
> >>>>>>>>>>>>>> progress based on the records fetched?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> >>>>>>>>>>> wcarlson@confluent.io>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hello all,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I would like to start discussion on KIP-715. This kip
> >>>>>> aims
> >>>>>>> to
> >>>>>>>>>> make
> >>>>>>>>>>> it
> >>>>>>>>>>>>>>> easier to monitor Kafka Streams progress by exposing the
> >>>>>>>>>> committed
> >>>>>>>>>>>>> offset
> >>>>>>>>>>>>>>> in a similar way as the consumer client does.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Here is the KIP:
> >>>>>>>> https://cwiki.apache.org/confluence/x/aRRRCg
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Walker
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Walker,

Thank you for the KIP!

I somehow agree that we should document that some tasks may be missing.

I have one question/comment. As far as I understand, your KIP adds two 
methods that return data that is actually hosted on the brokers, namely 
committedOffsets() and endOffsets(). Thus, we need a remote call to 
fetch the data and consequently the cost of calling 
localThreadMetaData() might increase substantially. I understand, that 
for committedOffsets(), we could avoid the remote call by maintaining 
the committedOffsets() locally, but can we also avoid the remote call 
for endOffsets()? Should we allow users to pass a parameter to 
localThreadMetaData() that skips the metadata that needs remote calls to 
keep the costs for use cases that do not need the end offsets low?

Best,
Bruno

On 02.03.21 02:18, Matthias J. Sax wrote:
>> but the user should
>> not rely on all tasks being returned at any given time to begin with since
>> it's possible we are in between revoking and re-assigning a partition.
> 
> Exactly. That is what I meant: the "hand off" phase of partitions during
> a rebalance. During this phase, some tasks are "missing" if you
> aggregate the information globally. My point was (even if it might be
> obvious to us) that it seems to be worth pointing out in the KIPs and in
> the docs.
> 
> I meant "partial information" from a global POV (not partial for a
> single local instance).
> 
>> Also I mention that they return the highest value they had seen
>> so far for any tasks they have assigned to them.
> 
> For the shutdown case maybe, but after a task is closed its metadata
> should not be returned any longer IMHO.
> 
> 
> -Matthias
> 
> On 3/1/21 4:46 PM, Walker Carlson wrote:
>> I updated to use Optional, good idea Mathias.
>>
>> For the localThreadMetadata, it could already be called running a
>> rebalance. Also I mention that they return the highest value they had seen
>> so far for any tasks they have assigned to them. I thought it would be
>> useful to see the TaskMetadata while the Threads were shutting down. I
>> think that there shouldn't really be partial information. If you think this
>> should be clarified better let me know.
>>
>> walker
>>
>> On Mon, Mar 1, 2021 at 3:45 PM Sophie Blee-Goldman <so...@confluent.io>
>> wrote:
>>
>>> Can you clarify your second question Matthias? If this is queried during
>>> a cooperative rebalance, it should return the tasks as usual. If the user
>>> is
>>> using eager rebalancing then this will not return any tasks, but the user
>>> should
>>> not rely on all tasks being returned at any given time to begin with since
>>> it's
>>> possible we are in between revoking and re-assigning a partition.
>>>
>>> What does "partial information" mean?
>>>
>>> (btw I agree that an Optional makes sense for timeCurrentIdlingStarted())
>>>
>>> On Mon, Mar 1, 2021 at 11:46 AM Matthias J. Sax <mj...@apache.org> wrote:
>>>
>>>> Thanks the updating the KIP Walker.
>>>>
>>>> About, `timeCurrentIdlingStarted()`: should we return an `Optional`
>>>> instead of `-1` if a task is not idling.
>>>>
>>>>
>>>> As we allow to call `localThreadMetadata()` any time, could it be that
>>>> we report partial information during a rebalance? If yes, this should be
>>>> pointed out, because if one want to implement a health check this needs
>>>> to be taken into account.
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 2/27/21 11:32 AM, Walker Carlson wrote:
>>>>> Sure thing Boyang,
>>>>>
>>>>> 1) it is in proposed changes. I expanded on it a bit more now.
>>>>> 2) done
>>>>> 3) and done :)
>>>>>
>>>>> thanks for the suggestions,
>>>>> walker
>>>>>
>>>>> On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <
>>> reluctanthero104@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Walker. Some minor comments:
>>>>>>
>>>>>> 1. Could you add a reference to localThreadMetadata method in the KIP?
>>>>>> 2. Could you make the code block as a java template, such that
>>>>>> TaskMetadata.java could be as the template title? Also it would be
>>> good
>>>> to
>>>>>> add some meta comments about the newly added functions.
>>>>>> 3. Could you write more details about rejected alternatives? Just as
>>>> why we
>>>>>> don't choose to expose as metrics, and how a new method on KStream is
>>>> not
>>>>>> favorable. These would be valuable when we look back on our design
>>>>>> decisions.
>>>>>>
>>>>>> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <
>>> wcarlson@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> I understand now. I think that is a valid concern but I think it is
>>>> best
>>>>>>> solved but having an external service verify through streams. As this
>>>> KIP
>>>>>>> is now just adding fields to TaskMetadata to be returned in the
>>>>>>> threadMetadata I am going to say that is out of scope.
>>>>>>>
>>>>>>> That seems to be the last concern. If there are no others I will put
>>>> this
>>>>>>> up for a vote soon.
>>>>>>>
>>>>>>> walker
>>>>>>>
>>>>>>> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <
>>>> reluctanthero104@gmail.com
>>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> For the 3rd point, yes, what I'm proposing is an edge case. For
>>>>>> example,
>>>>>>>> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing
>>>>>> logic
>>>>>>>> causing no one gets 1_1 assigned. Then the health check service will
>>>>>> only
>>>>>>>> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
>>>>>> paying
>>>>>>>> attention to 1_1. What I want to expose is a "logical global" view
>>> of
>>>>>> all
>>>>>>>> the tasks through the stream instance, since each instance gets the
>>>>>>>> assigned topology and should be able to infer all the exact tasks to
>>>> be
>>>>>>> up
>>>>>>>> and running when the service is healthy.
>>>>>>>>
>>>>>>>> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <
>>>> wcarlson@confluent.io
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the follow up Boyang and Guozhang,
>>>>>>>>>
>>>>>>>>> I have updated the kip to include these ideas.
>>>>>>>>>
>>>>>>>>> Guozhang, that is a good idea about using the TaskMetadata. We can
>>>>>> get
>>>>>>> it
>>>>>>>>> through the ThreadMetadata with a minor change to
>>>>>> `localThreadMetadata`
>>>>>>>> in
>>>>>>>>> kafkaStreams. This means that we will only need to update
>>>>>> TaskMetadata
>>>>>>>> and
>>>>>>>>> add no other APIs
>>>>>>>>>
>>>>>>>>> Boyang, since each TaskMetadata contains the TaskId and
>>>>>>> TopicPartitions I
>>>>>>>>> don't believe mapping either way will be a problem. Also I think we
>>>>>> can
>>>>>>>> do
>>>>>>>>> something like record the time the task started idling and when it
>>>>>>> stops
>>>>>>>>> idling we can override it to -1. I think that should clear up the
>>>>>> first
>>>>>>>> two
>>>>>>>>> points.
>>>>>>>>>
>>>>>>>>> As for your third point I am not sure I 100% understand. The
>>>>>>>> ThreadMetadata
>>>>>>>>> will contain a set of all task assigned to that thread. Any health
>>>>>>> check
>>>>>>>>> service will just need to query all clients and aggregate their
>>>>>>> responses
>>>>>>>>> to get a complete picture of all tasks correct?
>>>>>>>>>
>>>>>>>>> walker
>>>>>>>>>
>>>>>>>>> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Regarding the second API and the `TaskStatus` class: I'd suggest
>>> we
>>>>>>>>>> consolidate on the existing `TaskMetadata` since we have already
>>>>>>>>>> accumulated a bunch of such classes, and its better to keep them
>>>>>>> small
>>>>>>>> as
>>>>>>>>>> public APIs. You can see
>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12370
>>>>>>>>>> for a reference and a proposal.
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
>>>>>>>> reluctanthero104@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for the updates Walker. Some replies and follow-up
>>>>>>> questions:
>>>>>>>>>>>
>>>>>>>>>>> 1. I agree one task could have multiple partitions, but when we
>>>>>>> hit a
>>>>>>>>>> delay
>>>>>>>>>>> in terms of offset progress, do we have a convenient way to
>>>>>> reverse
>>>>>>>>>> mapping
>>>>>>>>>>> TopicPartition to the problematic task? In production, I believe
>>>>>> it
>>>>>>>>> would
>>>>>>>>>>> be much quicker to identify the problem using task.id instead of
>>>>>>>> topic
>>>>>>>>>>> partition, especially when it points to an internal topic. I
>>>>>> think
>>>>>>>>> having
>>>>>>>>>>> the task id as part of the entry value seems useful, which means
>>>>>>>>> getting
>>>>>>>>>>> something like Map<TopicPartition, TaskProgress> where
>>>>>> TaskProgress
>>>>>>>>>>> contains both committed offsets & task id.
>>>>>>>>>>>
>>>>>>>>>>> 2. The task idling API was still confusing. I don't think we care
>>>>>>>> about
>>>>>>>>>> the
>>>>>>>>>>> exact state when making tasksIdling()query, instead we care more
>>>>>>>> about
>>>>>>>>>> how
>>>>>>>>>>> long one task has been in idle state since when you called, which
>>>>>>>>>> reflects
>>>>>>>>>>> whether it is a normal idling period. So I feel it might be
>>>>>> helpful
>>>>>>>> to
>>>>>>>>>>> track that time difference and report it in the TaskStatus
>>>>>> struct.
>>>>>>>>>>>
>>>>>>>>>>> 3. What I want to achieve to have some global mapping of either
>>>>>>>>>>> TopicPartition or TaskId was that it is not possible for a health
>>>>>>>> check
>>>>>>>>>>> service to report a task failure that doesn't emit any metrics.
>>>>>> So
>>>>>>> as
>>>>>>>>>> long
>>>>>>>>>>> as we have a global topic partition API, health check could
>>>>>> always
>>>>>>> be
>>>>>>>>>> aware
>>>>>>>>>>> of any task/partition not reporting its progress, does that make
>>>>>>>> sense?
>>>>>>>>>> If
>>>>>>>>>>> you feel we have a better way to achieve this, such as querying
>>>>>> all
>>>>>>>> the
>>>>>>>>>>> input/intermediate topic metadata directly from Kafka for the
>>>>>>>>> baseline, I
>>>>>>>>>>> think that should be good as well and worth mentioning it in the
>>>>>>> KIP.
>>>>>>>>>>>
>>>>>>>>>>> Also it seems that the KIP hasn't reflected what you proposed for
>>>>>>> the
>>>>>>>>>> task
>>>>>>>>>>> idling status.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Boyang
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
>>>>>>>> wcarlson@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thank you for the comments everyone!
>>>>>>>>>>>>
>>>>>>>>>>>> I think there are a few things I can clear up in general then I
>>>>>>>> will
>>>>>>>>>>>> specifically respond to each question.
>>>>>>>>>>>>
>>>>>>>>>>>> First, when I say "idling" I refer to task idling. Where the
>>>>>>> stream
>>>>>>>>> is
>>>>>>>>>>>> intentionally not making progress. (
>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10091 is an
>>>>>>> example).
>>>>>>>>> This
>>>>>>>>>>>> becomes relevant if a task is waiting on one partition with no
>>>>>>> data
>>>>>>>>> but
>>>>>>>>>>>> that is holding up a partition with data. That would cause one
>>>>>>> just
>>>>>>>>>>> looking
>>>>>>>>>>>> at the committed offset changes to believe the task has a
>>>>>> problem
>>>>>>>>> when
>>>>>>>>>> it
>>>>>>>>>>>> is working as intended.
>>>>>>>>>>>>
>>>>>>>>>>>> In light of this confusion. I plan to change tasksIdling() to
>>>>>>>>>>> `Map<TaskId,
>>>>>>>>>>>> TaskStatus> getTasksStatus()` this should hopefully make it
>>>>>> more
>>>>>>>>> clear
>>>>>>>>>>> what
>>>>>>>>>>>> is being exposed.
>>>>>>>>>>>>
>>>>>>>>>>>> TaskStatus would include: TopicPartions, TaskId,
>>>>>>> ProcessorTopology,
>>>>>>>>>>> Idling,
>>>>>>>>>>>> and State.
>>>>>>>>>>>>
>>>>>>>>>>>> Boyang:
>>>>>>>>>>>>
>>>>>>>>>>>> 2) I think that each task should report on whatever
>>>>>>> TopicPartitions
>>>>>>>>>> they
>>>>>>>>>>>> hold, this means a Topic Partition might get reported twice but
>>>>>>> the
>>>>>>>>>> user
>>>>>>>>>>>> can roll those up and use the larger one when looking at the
>>>>>>> whole
>>>>>>>>> app.
>>>>>>>>>>>>
>>>>>>>>>>>> 4) If the user collects the committed offsets across all the
>>>>>>>> running
>>>>>>>>>>>> clients there shouldn't be any tasks missing correct?
>>>>>>>>>>>>
>>>>>>>>>>>> 6) Because there is not a 1:1 mapping between Tasks and
>>>>>>>>>> TopicPartitions I
>>>>>>>>>>>> think it is cleaner to report them separately.
>>>>>>>>>>>>
>>>>>>>>>>>> Guozhang:
>>>>>>>>>>>>
>>>>>>>>>>>> 1) Yes, that was my original plan but it made more sense to
>>>>>>> mirror
>>>>>>>>> how
>>>>>>>>>>> the
>>>>>>>>>>>> consumer exposes the committed offset.
>>>>>>>>>>>>
>>>>>>>>>>>> 3) That is a good point. I think that we should include
>>>>>> internal
>>>>>>>>> topics
>>>>>>>>>>> as
>>>>>>>>>>>> well. I think that if the topology were to evolve there should
>>>>>> be
>>>>>>>>> fair
>>>>>>>>>>>> warning anyways. Maybe you can clarify what would be limited by
>>>>>>>>>> exposing
>>>>>>>>>>>> the interior topics here? I thought a user could find them in
>>>>>>> other
>>>>>>>>>> ways.
>>>>>>>>>>>> If it is the name we could aynomise them before exposing them.
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you all for your comments. If I did not respond directly
>>>>>> to
>>>>>>>> one
>>>>>>>>>> of
>>>>>>>>>>>> your questions I updated the kip to include the details it was
>>>>>>>>>>> requesting.
>>>>>>>>>>>> I didn't not include my proposed changes mentioned earlier as I
>>>>>>>> would
>>>>>>>>>>> like
>>>>>>>>>>>> to get some feedback about what to include in TaskStatus and in
>>>>>>>>>> general.
>>>>>>>>>>>>
>>>>>>>>>>>> best,
>>>>>>>>>>>> Walker
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
>>>>>>> wangguoz@gmail.com
>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello Walker, thanks for the KIP. A few thoughts:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1) Have you considered just relying on the
>>>>>>>> `KafkaStreams#metrics()`
>>>>>>>>>>> that
>>>>>>>>>>>>> includes embedded consumer metrics that have the committed
>>>>>>>> offsets
>>>>>>>>>>>>> instead of adding a new API? Not advocating that this is a
>>>>>>> better
>>>>>>>>>>>> approach
>>>>>>>>>>>>> but want to make sure we considered all options before we
>>>>>> come
>>>>>>> to
>>>>>>>>> the
>>>>>>>>>>>> "last
>>>>>>>>>>>>> resort" of adding new public interfaces.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2) The javadoc mentions "tasks assigned to this client", but
>>>>>>> the
>>>>>>>>>>> returned
>>>>>>>>>>>>> map is on partitions. I think we should make the javadoc and
>>>>>>> the
>>>>>>>>>> return
>>>>>>>>>>>>> types consistent, either tasks or topic partitions.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3) In addition, if for 2) above we ended up with topic
>>>>>>>> partitions,
>>>>>>>>>> then
>>>>>>>>>>>>> would they include only external source topics, or also
>>>>>>> including
>>>>>>>>>>>> internal
>>>>>>>>>>>>> repartition / changelog topics? I think including only
>>>>>> external
>>>>>>>>>> source
>>>>>>>>>>>>> topic partitions are not sufficient for your goal of tracking
>>>>>>>>>> progress,
>>>>>>>>>>>> but
>>>>>>>>>>>>> exposing internal topic names are also a big commitment here
>>>>>>> for
>>>>>>>>>> future
>>>>>>>>>>>>> topology evolution.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 4)  For "tasksIdling", I'm wondering if we can make it more
>>>>>>>>> general,
>>>>>>>>>>> that
>>>>>>>>>>>>> the returned value is not just a boolean, but a TaskState
>>>>>> that
>>>>>>>> can
>>>>>>>>> be
>>>>>>>>>>> an
>>>>>>>>>>>>> enum of "created, restoring, running, idle, closing". This
>>>>>>> could
>>>>>>>>> help
>>>>>>>>>>> us
>>>>>>>>>>>> in
>>>>>>>>>>>>> the future to track other things like restoration efficiency
>>>>>>> and
>>>>>>>>>>>> rebalance
>>>>>>>>>>>>> efficiency etc.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 5) We need to clarify how is "idling" being defined here:
>>>>>> e.g.
>>>>>>> we
>>>>>>>>> can
>>>>>>>>>>>>> clearly state that a task is considered idle only if 1) lag
>>>>>> is
>>>>>>>>>>>>> increasing, indicating that there are indeed new records
>>>>>>> arrived
>>>>>>>> at
>>>>>>>>>>>> source,
>>>>>>>>>>>>> while committed offset is not advancing, AND 2) produced
>>>>>> offset
>>>>>>>>>>> (imagine
>>>>>>>>>>>> we
>>>>>>>>>>>>> may have punctuations that generate new data to the output
>>>>>>> topic
>>>>>>>>> even
>>>>>>>>>>> if
>>>>>>>>>>>>> there's no input for a while) is not advancing either.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
>>>>>>>>>>> reluctanthero104@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Walker for the proposed KIP! This should definitely
>>>>>>>>> empower
>>>>>>>>>>>>> KStream
>>>>>>>>>>>>>> users with better visibility.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Meanwhile I got a couple of questions/suggestions:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. typo "repost/report" in the motivation section.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2. What offsets do we report when the task is under
>>>>>>> restoration
>>>>>>>>> or
>>>>>>>>>>>>>> rebalancing?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 3. IIUC, we should clearly state that our reported metrics
>>>>>>> are
>>>>>>>>>> based
>>>>>>>>>>>> off
>>>>>>>>>>>>>> locally assigned tasks for each instance.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 4. In the meantime, what’s our strategy to report tasks
>>>>>> that
>>>>>>>> are
>>>>>>>>>> not
>>>>>>>>>>>>> local
>>>>>>>>>>>>>> to the instance? Users would normally try to monitor all
>>>>>> the
>>>>>>>>>> possible
>>>>>>>>>>>>>> tasks, and it’s unfortunate we couldn’t determine whether
>>>>>> we
>>>>>>>> have
>>>>>>>>>>> lost
>>>>>>>>>>>>>> tasks. My brainstorming was whether it makes sense for the
>>>>>>>> leader
>>>>>>>>>>>>> instance
>>>>>>>>>>>>>> to report the task progress as -1 for all “supposed to be
>>>>>>>>> running”
>>>>>>>>>>>> tasks,
>>>>>>>>>>>>>> so that on the metrics collector side it could catch any
>>>>>>>> missing
>>>>>>>>>>> tasks.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 5. It seems not clear how users should use `isTaskIdling`.
>>>>>>> Why
>>>>>>>>> not
>>>>>>>>>>>>> report a
>>>>>>>>>>>>>> map/set for idling tasks just as what we did for committed
>>>>>>>>> offsets?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 6. Why do we use TopicPartition instead of TaskId as the
>>>>>> key
>>>>>>> in
>>>>>>>>> the
>>>>>>>>>>>>>> returned map?
>>>>>>>>>>>>>> 7. Could we include some details in where we got the commit
>>>>>>>>> offsets
>>>>>>>>>>> for
>>>>>>>>>>>>>> each task? Is it through consumer offset fetch, or the
>>>>>> stream
>>>>>>>>>>>> processing
>>>>>>>>>>>>>> progress based on the records fetched?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
>>>>>>>>>>> wcarlson@confluent.io>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I would like to start discussion on KIP-715. This kip
>>>>>> aims
>>>>>>> to
>>>>>>>>>> make
>>>>>>>>>>> it
>>>>>>>>>>>>>>> easier to monitor Kafka Streams progress by exposing the
>>>>>>>>>> committed
>>>>>>>>>>>>> offset
>>>>>>>>>>>>>>> in a similar way as the consumer client does.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Here is the KIP:
>>>>>>>> https://cwiki.apache.org/confluence/x/aRRRCg
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Walker
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by "Matthias J. Sax" <mj...@apache.org>.
> but the user should
> not rely on all tasks being returned at any given time to begin with since
> it's possible we are in between revoking and re-assigning a partition.

Exactly. That is what I meant: the "hand off" phase of partitions during
a rebalance. During this phase, some tasks are "missing" if you
aggregate the information globally. My point was (even if it might be
obvious to us) that it seems to be worth pointing out in the KIPs and in
the docs.

I meant "partial information" from a global POV (not partial for a
single local instance).

> Also I mention that they return the highest value they had seen
> so far for any tasks they have assigned to them.

For the shutdown case maybe, but after a task is closed its metadata
should not be returned any longer IMHO.


-Matthias

On 3/1/21 4:46 PM, Walker Carlson wrote:
> I updated to use Optional, good idea Mathias.
> 
> For the localThreadMetadata, it could already be called running a
> rebalance. Also I mention that they return the highest value they had seen
> so far for any tasks they have assigned to them. I thought it would be
> useful to see the TaskMetadata while the Threads were shutting down. I
> think that there shouldn't really be partial information. If you think this
> should be clarified better let me know.
> 
> walker
> 
> On Mon, Mar 1, 2021 at 3:45 PM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
> 
>> Can you clarify your second question Matthias? If this is queried during
>> a cooperative rebalance, it should return the tasks as usual. If the user
>> is
>> using eager rebalancing then this will not return any tasks, but the user
>> should
>> not rely on all tasks being returned at any given time to begin with since
>> it's
>> possible we are in between revoking and re-assigning a partition.
>>
>> What does "partial information" mean?
>>
>> (btw I agree that an Optional makes sense for timeCurrentIdlingStarted())
>>
>> On Mon, Mar 1, 2021 at 11:46 AM Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> Thanks the updating the KIP Walker.
>>>
>>> About, `timeCurrentIdlingStarted()`: should we return an `Optional`
>>> instead of `-1` if a task is not idling.
>>>
>>>
>>> As we allow to call `localThreadMetadata()` any time, could it be that
>>> we report partial information during a rebalance? If yes, this should be
>>> pointed out, because if one want to implement a health check this needs
>>> to be taken into account.
>>>
>>> -Matthias
>>>
>>>
>>> On 2/27/21 11:32 AM, Walker Carlson wrote:
>>>> Sure thing Boyang,
>>>>
>>>> 1) it is in proposed changes. I expanded on it a bit more now.
>>>> 2) done
>>>> 3) and done :)
>>>>
>>>> thanks for the suggestions,
>>>> walker
>>>>
>>>> On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <
>> reluctanthero104@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Walker. Some minor comments:
>>>>>
>>>>> 1. Could you add a reference to localThreadMetadata method in the KIP?
>>>>> 2. Could you make the code block as a java template, such that
>>>>> TaskMetadata.java could be as the template title? Also it would be
>> good
>>> to
>>>>> add some meta comments about the newly added functions.
>>>>> 3. Could you write more details about rejected alternatives? Just as
>>> why we
>>>>> don't choose to expose as metrics, and how a new method on KStream is
>>> not
>>>>> favorable. These would be valuable when we look back on our design
>>>>> decisions.
>>>>>
>>>>> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <
>> wcarlson@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> I understand now. I think that is a valid concern but I think it is
>>> best
>>>>>> solved but having an external service verify through streams. As this
>>> KIP
>>>>>> is now just adding fields to TaskMetadata to be returned in the
>>>>>> threadMetadata I am going to say that is out of scope.
>>>>>>
>>>>>> That seems to be the last concern. If there are no others I will put
>>> this
>>>>>> up for a vote soon.
>>>>>>
>>>>>> walker
>>>>>>
>>>>>> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <
>>> reluctanthero104@gmail.com
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> For the 3rd point, yes, what I'm proposing is an edge case. For
>>>>> example,
>>>>>>> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing
>>>>> logic
>>>>>>> causing no one gets 1_1 assigned. Then the health check service will
>>>>> only
>>>>>>> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
>>>>> paying
>>>>>>> attention to 1_1. What I want to expose is a "logical global" view
>> of
>>>>> all
>>>>>>> the tasks through the stream instance, since each instance gets the
>>>>>>> assigned topology and should be able to infer all the exact tasks to
>>> be
>>>>>> up
>>>>>>> and running when the service is healthy.
>>>>>>>
>>>>>>> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <
>>> wcarlson@confluent.io
>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the follow up Boyang and Guozhang,
>>>>>>>>
>>>>>>>> I have updated the kip to include these ideas.
>>>>>>>>
>>>>>>>> Guozhang, that is a good idea about using the TaskMetadata. We can
>>>>> get
>>>>>> it
>>>>>>>> through the ThreadMetadata with a minor change to
>>>>> `localThreadMetadata`
>>>>>>> in
>>>>>>>> kafkaStreams. This means that we will only need to update
>>>>> TaskMetadata
>>>>>>> and
>>>>>>>> add no other APIs
>>>>>>>>
>>>>>>>> Boyang, since each TaskMetadata contains the TaskId and
>>>>>> TopicPartitions I
>>>>>>>> don't believe mapping either way will be a problem. Also I think we
>>>>> can
>>>>>>> do
>>>>>>>> something like record the time the task started idling and when it
>>>>>> stops
>>>>>>>> idling we can override it to -1. I think that should clear up the
>>>>> first
>>>>>>> two
>>>>>>>> points.
>>>>>>>>
>>>>>>>> As for your third point I am not sure I 100% understand. The
>>>>>>> ThreadMetadata
>>>>>>>> will contain a set of all task assigned to that thread. Any health
>>>>>> check
>>>>>>>> service will just need to query all clients and aggregate their
>>>>>> responses
>>>>>>>> to get a complete picture of all tasks correct?
>>>>>>>>
>>>>>>>> walker
>>>>>>>>
>>>>>>>> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wa...@gmail.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Regarding the second API and the `TaskStatus` class: I'd suggest
>> we
>>>>>>>>> consolidate on the existing `TaskMetadata` since we have already
>>>>>>>>> accumulated a bunch of such classes, and its better to keep them
>>>>>> small
>>>>>>> as
>>>>>>>>> public APIs. You can see
>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12370
>>>>>>>>> for a reference and a proposal.
>>>>>>>>>
>>>>>>>>> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
>>>>>>> reluctanthero104@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for the updates Walker. Some replies and follow-up
>>>>>> questions:
>>>>>>>>>>
>>>>>>>>>> 1. I agree one task could have multiple partitions, but when we
>>>>>> hit a
>>>>>>>>> delay
>>>>>>>>>> in terms of offset progress, do we have a convenient way to
>>>>> reverse
>>>>>>>>> mapping
>>>>>>>>>> TopicPartition to the problematic task? In production, I believe
>>>>> it
>>>>>>>> would
>>>>>>>>>> be much quicker to identify the problem using task.id instead of
>>>>>>> topic
>>>>>>>>>> partition, especially when it points to an internal topic. I
>>>>> think
>>>>>>>> having
>>>>>>>>>> the task id as part of the entry value seems useful, which means
>>>>>>>> getting
>>>>>>>>>> something like Map<TopicPartition, TaskProgress> where
>>>>> TaskProgress
>>>>>>>>>> contains both committed offsets & task id.
>>>>>>>>>>
>>>>>>>>>> 2. The task idling API was still confusing. I don't think we care
>>>>>>> about
>>>>>>>>> the
>>>>>>>>>> exact state when making tasksIdling()query, instead we care more
>>>>>>> about
>>>>>>>>> how
>>>>>>>>>> long one task has been in idle state since when you called, which
>>>>>>>>> reflects
>>>>>>>>>> whether it is a normal idling period. So I feel it might be
>>>>> helpful
>>>>>>> to
>>>>>>>>>> track that time difference and report it in the TaskStatus
>>>>> struct.
>>>>>>>>>>
>>>>>>>>>> 3. What I want to achieve to have some global mapping of either
>>>>>>>>>> TopicPartition or TaskId was that it is not possible for a health
>>>>>>> check
>>>>>>>>>> service to report a task failure that doesn't emit any metrics.
>>>>> So
>>>>>> as
>>>>>>>>> long
>>>>>>>>>> as we have a global topic partition API, health check could
>>>>> always
>>>>>> be
>>>>>>>>> aware
>>>>>>>>>> of any task/partition not reporting its progress, does that make
>>>>>>> sense?
>>>>>>>>> If
>>>>>>>>>> you feel we have a better way to achieve this, such as querying
>>>>> all
>>>>>>> the
>>>>>>>>>> input/intermediate topic metadata directly from Kafka for the
>>>>>>>> baseline, I
>>>>>>>>>> think that should be good as well and worth mentioning it in the
>>>>>> KIP.
>>>>>>>>>>
>>>>>>>>>> Also it seems that the KIP hasn't reflected what you proposed for
>>>>>> the
>>>>>>>>> task
>>>>>>>>>> idling status.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Boyang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
>>>>>>> wcarlson@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you for the comments everyone!
>>>>>>>>>>>
>>>>>>>>>>> I think there are a few things I can clear up in general then I
>>>>>>> will
>>>>>>>>>>> specifically respond to each question.
>>>>>>>>>>>
>>>>>>>>>>> First, when I say "idling" I refer to task idling. Where the
>>>>>> stream
>>>>>>>> is
>>>>>>>>>>> intentionally not making progress. (
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10091 is an
>>>>>> example).
>>>>>>>> This
>>>>>>>>>>> becomes relevant if a task is waiting on one partition with no
>>>>>> data
>>>>>>>> but
>>>>>>>>>>> that is holding up a partition with data. That would cause one
>>>>>> just
>>>>>>>>>> looking
>>>>>>>>>>> at the committed offset changes to believe the task has a
>>>>> problem
>>>>>>>> when
>>>>>>>>> it
>>>>>>>>>>> is working as intended.
>>>>>>>>>>>
>>>>>>>>>>> In light of this confusion. I plan to change tasksIdling() to
>>>>>>>>>> `Map<TaskId,
>>>>>>>>>>> TaskStatus> getTasksStatus()` this should hopefully make it
>>>>> more
>>>>>>>> clear
>>>>>>>>>> what
>>>>>>>>>>> is being exposed.
>>>>>>>>>>>
>>>>>>>>>>> TaskStatus would include: TopicPartions, TaskId,
>>>>>> ProcessorTopology,
>>>>>>>>>> Idling,
>>>>>>>>>>> and State.
>>>>>>>>>>>
>>>>>>>>>>> Boyang:
>>>>>>>>>>>
>>>>>>>>>>> 2) I think that each task should report on whatever
>>>>>> TopicPartitions
>>>>>>>>> they
>>>>>>>>>>> hold, this means a Topic Partition might get reported twice but
>>>>>> the
>>>>>>>>> user
>>>>>>>>>>> can roll those up and use the larger one when looking at the
>>>>>> whole
>>>>>>>> app.
>>>>>>>>>>>
>>>>>>>>>>> 4) If the user collects the committed offsets across all the
>>>>>>> running
>>>>>>>>>>> clients there shouldn't be any tasks missing correct?
>>>>>>>>>>>
>>>>>>>>>>> 6) Because there is not a 1:1 mapping between Tasks and
>>>>>>>>> TopicPartitions I
>>>>>>>>>>> think it is cleaner to report them separately.
>>>>>>>>>>>
>>>>>>>>>>> Guozhang:
>>>>>>>>>>>
>>>>>>>>>>> 1) Yes, that was my original plan but it made more sense to
>>>>>> mirror
>>>>>>>> how
>>>>>>>>>> the
>>>>>>>>>>> consumer exposes the committed offset.
>>>>>>>>>>>
>>>>>>>>>>> 3) That is a good point. I think that we should include
>>>>> internal
>>>>>>>> topics
>>>>>>>>>> as
>>>>>>>>>>> well. I think that if the topology were to evolve there should
>>>>> be
>>>>>>>> fair
>>>>>>>>>>> warning anyways. Maybe you can clarify what would be limited by
>>>>>>>>> exposing
>>>>>>>>>>> the interior topics here? I thought a user could find them in
>>>>>> other
>>>>>>>>> ways.
>>>>>>>>>>> If it is the name we could aynomise them before exposing them.
>>>>>>>>>>>
>>>>>>>>>>> Thank you all for your comments. If I did not respond directly
>>>>> to
>>>>>>> one
>>>>>>>>> of
>>>>>>>>>>> your questions I updated the kip to include the details it was
>>>>>>>>>> requesting.
>>>>>>>>>>> I didn't not include my proposed changes mentioned earlier as I
>>>>>>> would
>>>>>>>>>> like
>>>>>>>>>>> to get some feedback about what to include in TaskStatus and in
>>>>>>>>> general.
>>>>>>>>>>>
>>>>>>>>>>> best,
>>>>>>>>>>> Walker
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
>>>>>> wangguoz@gmail.com
>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello Walker, thanks for the KIP. A few thoughts:
>>>>>>>>>>>>
>>>>>>>>>>>> 1) Have you considered just relying on the
>>>>>>> `KafkaStreams#metrics()`
>>>>>>>>>> that
>>>>>>>>>>>> includes embedded consumer metrics that have the committed
>>>>>>> offsets
>>>>>>>>>>>> instead of adding a new API? Not advocating that this is a
>>>>>> better
>>>>>>>>>>> approach
>>>>>>>>>>>> but want to make sure we considered all options before we
>>>>> come
>>>>>> to
>>>>>>>> the
>>>>>>>>>>> "last
>>>>>>>>>>>> resort" of adding new public interfaces.
>>>>>>>>>>>>
>>>>>>>>>>>> 2) The javadoc mentions "tasks assigned to this client", but
>>>>>> the
>>>>>>>>>> returned
>>>>>>>>>>>> map is on partitions. I think we should make the javadoc and
>>>>>> the
>>>>>>>>> return
>>>>>>>>>>>> types consistent, either tasks or topic partitions.
>>>>>>>>>>>>
>>>>>>>>>>>> 3) In addition, if for 2) above we ended up with topic
>>>>>>> partitions,
>>>>>>>>> then
>>>>>>>>>>>> would they include only external source topics, or also
>>>>>> including
>>>>>>>>>>> internal
>>>>>>>>>>>> repartition / changelog topics? I think including only
>>>>> external
>>>>>>>>> source
>>>>>>>>>>>> topic partitions are not sufficient for your goal of tracking
>>>>>>>>> progress,
>>>>>>>>>>> but
>>>>>>>>>>>> exposing internal topic names are also a big commitment here
>>>>>> for
>>>>>>>>> future
>>>>>>>>>>>> topology evolution.
>>>>>>>>>>>>
>>>>>>>>>>>> 4)  For "tasksIdling", I'm wondering if we can make it more
>>>>>>>> general,
>>>>>>>>>> that
>>>>>>>>>>>> the returned value is not just a boolean, but a TaskState
>>>>> that
>>>>>>> can
>>>>>>>> be
>>>>>>>>>> an
>>>>>>>>>>>> enum of "created, restoring, running, idle, closing". This
>>>>>> could
>>>>>>>> help
>>>>>>>>>> us
>>>>>>>>>>> in
>>>>>>>>>>>> the future to track other things like restoration efficiency
>>>>>> and
>>>>>>>>>>> rebalance
>>>>>>>>>>>> efficiency etc.
>>>>>>>>>>>>
>>>>>>>>>>>> 5) We need to clarify how is "idling" being defined here:
>>>>> e.g.
>>>>>> we
>>>>>>>> can
>>>>>>>>>>>> clearly state that a task is considered idle only if 1) lag
>>>>> is
>>>>>>>>>>>> increasing, indicating that there are indeed new records
>>>>>> arrived
>>>>>>> at
>>>>>>>>>>> source,
>>>>>>>>>>>> while committed offset is not advancing, AND 2) produced
>>>>> offset
>>>>>>>>>> (imagine
>>>>>>>>>>> we
>>>>>>>>>>>> may have punctuations that generate new data to the output
>>>>>> topic
>>>>>>>> even
>>>>>>>>>> if
>>>>>>>>>>>> there's no input for a while) is not advancing either.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
>>>>>>>>>> reluctanthero104@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Walker for the proposed KIP! This should definitely
>>>>>>>> empower
>>>>>>>>>>>> KStream
>>>>>>>>>>>>> users with better visibility.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Meanwhile I got a couple of questions/suggestions:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. typo "repost/report" in the motivation section.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. What offsets do we report when the task is under
>>>>>> restoration
>>>>>>>> or
>>>>>>>>>>>>> rebalancing?
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3. IIUC, we should clearly state that our reported metrics
>>>>>> are
>>>>>>>>> based
>>>>>>>>>>> off
>>>>>>>>>>>>> locally assigned tasks for each instance.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 4. In the meantime, what’s our strategy to report tasks
>>>>> that
>>>>>>> are
>>>>>>>>> not
>>>>>>>>>>>> local
>>>>>>>>>>>>> to the instance? Users would normally try to monitor all
>>>>> the
>>>>>>>>> possible
>>>>>>>>>>>>> tasks, and it’s unfortunate we couldn’t determine whether
>>>>> we
>>>>>>> have
>>>>>>>>>> lost
>>>>>>>>>>>>> tasks. My brainstorming was whether it makes sense for the
>>>>>>> leader
>>>>>>>>>>>> instance
>>>>>>>>>>>>> to report the task progress as -1 for all “supposed to be
>>>>>>>> running”
>>>>>>>>>>> tasks,
>>>>>>>>>>>>> so that on the metrics collector side it could catch any
>>>>>>> missing
>>>>>>>>>> tasks.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 5. It seems not clear how users should use `isTaskIdling`.
>>>>>> Why
>>>>>>>> not
>>>>>>>>>>>> report a
>>>>>>>>>>>>> map/set for idling tasks just as what we did for committed
>>>>>>>> offsets?
>>>>>>>>>>>>>
>>>>>>>>>>>>> 6. Why do we use TopicPartition instead of TaskId as the
>>>>> key
>>>>>> in
>>>>>>>> the
>>>>>>>>>>>>> returned map?
>>>>>>>>>>>>> 7. Could we include some details in where we got the commit
>>>>>>>> offsets
>>>>>>>>>> for
>>>>>>>>>>>>> each task? Is it through consumer offset fetch, or the
>>>>> stream
>>>>>>>>>>> processing
>>>>>>>>>>>>> progress based on the records fetched?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
>>>>>>>>>> wcarlson@confluent.io>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would like to start discussion on KIP-715. This kip
>>>>> aims
>>>>>> to
>>>>>>>>> make
>>>>>>>>>> it
>>>>>>>>>>>>>> easier to monitor Kafka Streams progress by exposing the
>>>>>>>>> committed
>>>>>>>>>>>> offset
>>>>>>>>>>>>>> in a similar way as the consumer client does.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here is the KIP:
>>>>>>> https://cwiki.apache.org/confluence/x/aRRRCg
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Walker
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Walker Carlson <wc...@confluent.io>.
I updated to use Optional, good idea Mathias.

For the localThreadMetadata, it could already be called running a
rebalance. Also I mention that they return the highest value they had seen
so far for any tasks they have assigned to them. I thought it would be
useful to see the TaskMetadata while the Threads were shutting down. I
think that there shouldn't really be partial information. If you think this
should be clarified better let me know.

walker

On Mon, Mar 1, 2021 at 3:45 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Can you clarify your second question Matthias? If this is queried during
> a cooperative rebalance, it should return the tasks as usual. If the user
> is
> using eager rebalancing then this will not return any tasks, but the user
> should
> not rely on all tasks being returned at any given time to begin with since
> it's
> possible we are in between revoking and re-assigning a partition.
>
> What does "partial information" mean?
>
> (btw I agree that an Optional makes sense for timeCurrentIdlingStarted())
>
> On Mon, Mar 1, 2021 at 11:46 AM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Thanks the updating the KIP Walker.
> >
> > About, `timeCurrentIdlingStarted()`: should we return an `Optional`
> > instead of `-1` if a task is not idling.
> >
> >
> > As we allow to call `localThreadMetadata()` any time, could it be that
> > we report partial information during a rebalance? If yes, this should be
> > pointed out, because if one want to implement a health check this needs
> > to be taken into account.
> >
> > -Matthias
> >
> >
> > On 2/27/21 11:32 AM, Walker Carlson wrote:
> > > Sure thing Boyang,
> > >
> > > 1) it is in proposed changes. I expanded on it a bit more now.
> > > 2) done
> > > 3) and done :)
> > >
> > > thanks for the suggestions,
> > > walker
> > >
> > > On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <
> reluctanthero104@gmail.com>
> > > wrote:
> > >
> > >> Thanks Walker. Some minor comments:
> > >>
> > >> 1. Could you add a reference to localThreadMetadata method in the KIP?
> > >> 2. Could you make the code block as a java template, such that
> > >> TaskMetadata.java could be as the template title? Also it would be
> good
> > to
> > >> add some meta comments about the newly added functions.
> > >> 3. Could you write more details about rejected alternatives? Just as
> > why we
> > >> don't choose to expose as metrics, and how a new method on KStream is
> > not
> > >> favorable. These would be valuable when we look back on our design
> > >> decisions.
> > >>
> > >> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <
> wcarlson@confluent.io>
> > >> wrote:
> > >>
> > >>> I understand now. I think that is a valid concern but I think it is
> > best
> > >>> solved but having an external service verify through streams. As this
> > KIP
> > >>> is now just adding fields to TaskMetadata to be returned in the
> > >>> threadMetadata I am going to say that is out of scope.
> > >>>
> > >>> That seems to be the last concern. If there are no others I will put
> > this
> > >>> up for a vote soon.
> > >>>
> > >>> walker
> > >>>
> > >>> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <
> > reluctanthero104@gmail.com
> > >>>
> > >>> wrote:
> > >>>
> > >>>> For the 3rd point, yes, what I'm proposing is an edge case. For
> > >> example,
> > >>>> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing
> > >> logic
> > >>>> causing no one gets 1_1 assigned. Then the health check service will
> > >> only
> > >>>> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
> > >> paying
> > >>>> attention to 1_1. What I want to expose is a "logical global" view
> of
> > >> all
> > >>>> the tasks through the stream instance, since each instance gets the
> > >>>> assigned topology and should be able to infer all the exact tasks to
> > be
> > >>> up
> > >>>> and running when the service is healthy.
> > >>>>
> > >>>> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <
> > wcarlson@confluent.io
> > >>>
> > >>>> wrote:
> > >>>>
> > >>>>> Thanks for the follow up Boyang and Guozhang,
> > >>>>>
> > >>>>> I have updated the kip to include these ideas.
> > >>>>>
> > >>>>> Guozhang, that is a good idea about using the TaskMetadata. We can
> > >> get
> > >>> it
> > >>>>> through the ThreadMetadata with a minor change to
> > >> `localThreadMetadata`
> > >>>> in
> > >>>>> kafkaStreams. This means that we will only need to update
> > >> TaskMetadata
> > >>>> and
> > >>>>> add no other APIs
> > >>>>>
> > >>>>> Boyang, since each TaskMetadata contains the TaskId and
> > >>> TopicPartitions I
> > >>>>> don't believe mapping either way will be a problem. Also I think we
> > >> can
> > >>>> do
> > >>>>> something like record the time the task started idling and when it
> > >>> stops
> > >>>>> idling we can override it to -1. I think that should clear up the
> > >> first
> > >>>> two
> > >>>>> points.
> > >>>>>
> > >>>>> As for your third point I am not sure I 100% understand. The
> > >>>> ThreadMetadata
> > >>>>> will contain a set of all task assigned to that thread. Any health
> > >>> check
> > >>>>> service will just need to query all clients and aggregate their
> > >>> responses
> > >>>>> to get a complete picture of all tasks correct?
> > >>>>>
> > >>>>> walker
> > >>>>>
> > >>>>> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wa...@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>>> Regarding the second API and the `TaskStatus` class: I'd suggest
> we
> > >>>>>> consolidate on the existing `TaskMetadata` since we have already
> > >>>>>> accumulated a bunch of such classes, and its better to keep them
> > >>> small
> > >>>> as
> > >>>>>> public APIs. You can see
> > >>>>> https://issues.apache.org/jira/browse/KAFKA-12370
> > >>>>>> for a reference and a proposal.
> > >>>>>>
> > >>>>>> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
> > >>>> reluctanthero104@gmail.com>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Thanks for the updates Walker. Some replies and follow-up
> > >>> questions:
> > >>>>>>>
> > >>>>>>> 1. I agree one task could have multiple partitions, but when we
> > >>> hit a
> > >>>>>> delay
> > >>>>>>> in terms of offset progress, do we have a convenient way to
> > >> reverse
> > >>>>>> mapping
> > >>>>>>> TopicPartition to the problematic task? In production, I believe
> > >> it
> > >>>>> would
> > >>>>>>> be much quicker to identify the problem using task.id instead of
> > >>>> topic
> > >>>>>>> partition, especially when it points to an internal topic. I
> > >> think
> > >>>>> having
> > >>>>>>> the task id as part of the entry value seems useful, which means
> > >>>>> getting
> > >>>>>>> something like Map<TopicPartition, TaskProgress> where
> > >> TaskProgress
> > >>>>>>> contains both committed offsets & task id.
> > >>>>>>>
> > >>>>>>> 2. The task idling API was still confusing. I don't think we care
> > >>>> about
> > >>>>>> the
> > >>>>>>> exact state when making tasksIdling()query, instead we care more
> > >>>> about
> > >>>>>> how
> > >>>>>>> long one task has been in idle state since when you called, which
> > >>>>>> reflects
> > >>>>>>> whether it is a normal idling period. So I feel it might be
> > >> helpful
> > >>>> to
> > >>>>>>> track that time difference and report it in the TaskStatus
> > >> struct.
> > >>>>>>>
> > >>>>>>> 3. What I want to achieve to have some global mapping of either
> > >>>>>>> TopicPartition or TaskId was that it is not possible for a health
> > >>>> check
> > >>>>>>> service to report a task failure that doesn't emit any metrics.
> > >> So
> > >>> as
> > >>>>>> long
> > >>>>>>> as we have a global topic partition API, health check could
> > >> always
> > >>> be
> > >>>>>> aware
> > >>>>>>> of any task/partition not reporting its progress, does that make
> > >>>> sense?
> > >>>>>> If
> > >>>>>>> you feel we have a better way to achieve this, such as querying
> > >> all
> > >>>> the
> > >>>>>>> input/intermediate topic metadata directly from Kafka for the
> > >>>>> baseline, I
> > >>>>>>> think that should be good as well and worth mentioning it in the
> > >>> KIP.
> > >>>>>>>
> > >>>>>>> Also it seems that the KIP hasn't reflected what you proposed for
> > >>> the
> > >>>>>> task
> > >>>>>>> idling status.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Boyang
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
> > >>>> wcarlson@confluent.io>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Thank you for the comments everyone!
> > >>>>>>>>
> > >>>>>>>> I think there are a few things I can clear up in general then I
> > >>>> will
> > >>>>>>>> specifically respond to each question.
> > >>>>>>>>
> > >>>>>>>> First, when I say "idling" I refer to task idling. Where the
> > >>> stream
> > >>>>> is
> > >>>>>>>> intentionally not making progress. (
> > >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10091 is an
> > >>> example).
> > >>>>> This
> > >>>>>>>> becomes relevant if a task is waiting on one partition with no
> > >>> data
> > >>>>> but
> > >>>>>>>> that is holding up a partition with data. That would cause one
> > >>> just
> > >>>>>>> looking
> > >>>>>>>> at the committed offset changes to believe the task has a
> > >> problem
> > >>>>> when
> > >>>>>> it
> > >>>>>>>> is working as intended.
> > >>>>>>>>
> > >>>>>>>> In light of this confusion. I plan to change tasksIdling() to
> > >>>>>>> `Map<TaskId,
> > >>>>>>>> TaskStatus> getTasksStatus()` this should hopefully make it
> > >> more
> > >>>>> clear
> > >>>>>>> what
> > >>>>>>>> is being exposed.
> > >>>>>>>>
> > >>>>>>>> TaskStatus would include: TopicPartions, TaskId,
> > >>> ProcessorTopology,
> > >>>>>>> Idling,
> > >>>>>>>> and State.
> > >>>>>>>>
> > >>>>>>>> Boyang:
> > >>>>>>>>
> > >>>>>>>> 2) I think that each task should report on whatever
> > >>> TopicPartitions
> > >>>>>> they
> > >>>>>>>> hold, this means a Topic Partition might get reported twice but
> > >>> the
> > >>>>>> user
> > >>>>>>>> can roll those up and use the larger one when looking at the
> > >>> whole
> > >>>>> app.
> > >>>>>>>>
> > >>>>>>>> 4) If the user collects the committed offsets across all the
> > >>>> running
> > >>>>>>>> clients there shouldn't be any tasks missing correct?
> > >>>>>>>>
> > >>>>>>>> 6) Because there is not a 1:1 mapping between Tasks and
> > >>>>>> TopicPartitions I
> > >>>>>>>> think it is cleaner to report them separately.
> > >>>>>>>>
> > >>>>>>>> Guozhang:
> > >>>>>>>>
> > >>>>>>>> 1) Yes, that was my original plan but it made more sense to
> > >>> mirror
> > >>>>> how
> > >>>>>>> the
> > >>>>>>>> consumer exposes the committed offset.
> > >>>>>>>>
> > >>>>>>>> 3) That is a good point. I think that we should include
> > >> internal
> > >>>>> topics
> > >>>>>>> as
> > >>>>>>>> well. I think that if the topology were to evolve there should
> > >> be
> > >>>>> fair
> > >>>>>>>> warning anyways. Maybe you can clarify what would be limited by
> > >>>>>> exposing
> > >>>>>>>> the interior topics here? I thought a user could find them in
> > >>> other
> > >>>>>> ways.
> > >>>>>>>> If it is the name we could aynomise them before exposing them.
> > >>>>>>>>
> > >>>>>>>> Thank you all for your comments. If I did not respond directly
> > >> to
> > >>>> one
> > >>>>>> of
> > >>>>>>>> your questions I updated the kip to include the details it was
> > >>>>>>> requesting.
> > >>>>>>>> I didn't not include my proposed changes mentioned earlier as I
> > >>>> would
> > >>>>>>> like
> > >>>>>>>> to get some feedback about what to include in TaskStatus and in
> > >>>>>> general.
> > >>>>>>>>
> > >>>>>>>> best,
> > >>>>>>>> Walker
> > >>>>>>>>
> > >>>>>>>> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
> > >>> wangguoz@gmail.com
> > >>>>>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hello Walker, thanks for the KIP. A few thoughts:
> > >>>>>>>>>
> > >>>>>>>>> 1) Have you considered just relying on the
> > >>>> `KafkaStreams#metrics()`
> > >>>>>>> that
> > >>>>>>>>> includes embedded consumer metrics that have the committed
> > >>>> offsets
> > >>>>>>>>> instead of adding a new API? Not advocating that this is a
> > >>> better
> > >>>>>>>> approach
> > >>>>>>>>> but want to make sure we considered all options before we
> > >> come
> > >>> to
> > >>>>> the
> > >>>>>>>> "last
> > >>>>>>>>> resort" of adding new public interfaces.
> > >>>>>>>>>
> > >>>>>>>>> 2) The javadoc mentions "tasks assigned to this client", but
> > >>> the
> > >>>>>>> returned
> > >>>>>>>>> map is on partitions. I think we should make the javadoc and
> > >>> the
> > >>>>>> return
> > >>>>>>>>> types consistent, either tasks or topic partitions.
> > >>>>>>>>>
> > >>>>>>>>> 3) In addition, if for 2) above we ended up with topic
> > >>>> partitions,
> > >>>>>> then
> > >>>>>>>>> would they include only external source topics, or also
> > >>> including
> > >>>>>>>> internal
> > >>>>>>>>> repartition / changelog topics? I think including only
> > >> external
> > >>>>>> source
> > >>>>>>>>> topic partitions are not sufficient for your goal of tracking
> > >>>>>> progress,
> > >>>>>>>> but
> > >>>>>>>>> exposing internal topic names are also a big commitment here
> > >>> for
> > >>>>>> future
> > >>>>>>>>> topology evolution.
> > >>>>>>>>>
> > >>>>>>>>> 4)  For "tasksIdling", I'm wondering if we can make it more
> > >>>>> general,
> > >>>>>>> that
> > >>>>>>>>> the returned value is not just a boolean, but a TaskState
> > >> that
> > >>>> can
> > >>>>> be
> > >>>>>>> an
> > >>>>>>>>> enum of "created, restoring, running, idle, closing". This
> > >>> could
> > >>>>> help
> > >>>>>>> us
> > >>>>>>>> in
> > >>>>>>>>> the future to track other things like restoration efficiency
> > >>> and
> > >>>>>>>> rebalance
> > >>>>>>>>> efficiency etc.
> > >>>>>>>>>
> > >>>>>>>>> 5) We need to clarify how is "idling" being defined here:
> > >> e.g.
> > >>> we
> > >>>>> can
> > >>>>>>>>> clearly state that a task is considered idle only if 1) lag
> > >> is
> > >>>>>>>>> increasing, indicating that there are indeed new records
> > >>> arrived
> > >>>> at
> > >>>>>>>> source,
> > >>>>>>>>> while committed offset is not advancing, AND 2) produced
> > >> offset
> > >>>>>>> (imagine
> > >>>>>>>> we
> > >>>>>>>>> may have punctuations that generate new data to the output
> > >>> topic
> > >>>>> even
> > >>>>>>> if
> > >>>>>>>>> there's no input for a while) is not advancing either.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Guozhang
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> > >>>>>>> reluctanthero104@gmail.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Thanks Walker for the proposed KIP! This should definitely
> > >>>>> empower
> > >>>>>>>>> KStream
> > >>>>>>>>>> users with better visibility.
> > >>>>>>>>>>
> > >>>>>>>>>> Meanwhile I got a couple of questions/suggestions:
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> 1. typo "repost/report" in the motivation section.
> > >>>>>>>>>>
> > >>>>>>>>>> 2. What offsets do we report when the task is under
> > >>> restoration
> > >>>>> or
> > >>>>>>>>>> rebalancing?
> > >>>>>>>>>>
> > >>>>>>>>>> 3. IIUC, we should clearly state that our reported metrics
> > >>> are
> > >>>>>> based
> > >>>>>>>> off
> > >>>>>>>>>> locally assigned tasks for each instance.
> > >>>>>>>>>>
> > >>>>>>>>>> 4. In the meantime, what’s our strategy to report tasks
> > >> that
> > >>>> are
> > >>>>>> not
> > >>>>>>>>> local
> > >>>>>>>>>> to the instance? Users would normally try to monitor all
> > >> the
> > >>>>>> possible
> > >>>>>>>>>> tasks, and it’s unfortunate we couldn’t determine whether
> > >> we
> > >>>> have
> > >>>>>>> lost
> > >>>>>>>>>> tasks. My brainstorming was whether it makes sense for the
> > >>>> leader
> > >>>>>>>>> instance
> > >>>>>>>>>> to report the task progress as -1 for all “supposed to be
> > >>>>> running”
> > >>>>>>>> tasks,
> > >>>>>>>>>> so that on the metrics collector side it could catch any
> > >>>> missing
> > >>>>>>> tasks.
> > >>>>>>>>>>
> > >>>>>>>>>> 5. It seems not clear how users should use `isTaskIdling`.
> > >>> Why
> > >>>>> not
> > >>>>>>>>> report a
> > >>>>>>>>>> map/set for idling tasks just as what we did for committed
> > >>>>> offsets?
> > >>>>>>>>>>
> > >>>>>>>>>> 6. Why do we use TopicPartition instead of TaskId as the
> > >> key
> > >>> in
> > >>>>> the
> > >>>>>>>>>> returned map?
> > >>>>>>>>>> 7. Could we include some details in where we got the commit
> > >>>>> offsets
> > >>>>>>> for
> > >>>>>>>>>> each task? Is it through consumer offset fetch, or the
> > >> stream
> > >>>>>>>> processing
> > >>>>>>>>>> progress based on the records fetched?
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> > >>>>>>> wcarlson@confluent.io>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hello all,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I would like to start discussion on KIP-715. This kip
> > >> aims
> > >>> to
> > >>>>>> make
> > >>>>>>> it
> > >>>>>>>>>>> easier to monitor Kafka Streams progress by exposing the
> > >>>>>> committed
> > >>>>>>>>> offset
> > >>>>>>>>>>> in a similar way as the consumer client does.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Here is the KIP:
> > >>>> https://cwiki.apache.org/confluence/x/aRRRCg
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best,
> > >>>>>>>>>>> Walker
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>> -- Guozhang
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Can you clarify your second question Matthias? If this is queried during
a cooperative rebalance, it should return the tasks as usual. If the user is
using eager rebalancing then this will not return any tasks, but the user
should
not rely on all tasks being returned at any given time to begin with since
it's
possible we are in between revoking and re-assigning a partition.

What does "partial information" mean?

(btw I agree that an Optional makes sense for timeCurrentIdlingStarted())

On Mon, Mar 1, 2021 at 11:46 AM Matthias J. Sax <mj...@apache.org> wrote:

> Thanks the updating the KIP Walker.
>
> About, `timeCurrentIdlingStarted()`: should we return an `Optional`
> instead of `-1` if a task is not idling.
>
>
> As we allow to call `localThreadMetadata()` any time, could it be that
> we report partial information during a rebalance? If yes, this should be
> pointed out, because if one want to implement a health check this needs
> to be taken into account.
>
> -Matthias
>
>
> On 2/27/21 11:32 AM, Walker Carlson wrote:
> > Sure thing Boyang,
> >
> > 1) it is in proposed changes. I expanded on it a bit more now.
> > 2) done
> > 3) and done :)
> >
> > thanks for the suggestions,
> > walker
> >
> > On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <re...@gmail.com>
> > wrote:
> >
> >> Thanks Walker. Some minor comments:
> >>
> >> 1. Could you add a reference to localThreadMetadata method in the KIP?
> >> 2. Could you make the code block as a java template, such that
> >> TaskMetadata.java could be as the template title? Also it would be good
> to
> >> add some meta comments about the newly added functions.
> >> 3. Could you write more details about rejected alternatives? Just as
> why we
> >> don't choose to expose as metrics, and how a new method on KStream is
> not
> >> favorable. These would be valuable when we look back on our design
> >> decisions.
> >>
> >> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <wc...@confluent.io>
> >> wrote:
> >>
> >>> I understand now. I think that is a valid concern but I think it is
> best
> >>> solved but having an external service verify through streams. As this
> KIP
> >>> is now just adding fields to TaskMetadata to be returned in the
> >>> threadMetadata I am going to say that is out of scope.
> >>>
> >>> That seems to be the last concern. If there are no others I will put
> this
> >>> up for a vote soon.
> >>>
> >>> walker
> >>>
> >>> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <
> reluctanthero104@gmail.com
> >>>
> >>> wrote:
> >>>
> >>>> For the 3rd point, yes, what I'm proposing is an edge case. For
> >> example,
> >>>> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing
> >> logic
> >>>> causing no one gets 1_1 assigned. Then the health check service will
> >> only
> >>>> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
> >> paying
> >>>> attention to 1_1. What I want to expose is a "logical global" view of
> >> all
> >>>> the tasks through the stream instance, since each instance gets the
> >>>> assigned topology and should be able to infer all the exact tasks to
> be
> >>> up
> >>>> and running when the service is healthy.
> >>>>
> >>>> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <
> wcarlson@confluent.io
> >>>
> >>>> wrote:
> >>>>
> >>>>> Thanks for the follow up Boyang and Guozhang,
> >>>>>
> >>>>> I have updated the kip to include these ideas.
> >>>>>
> >>>>> Guozhang, that is a good idea about using the TaskMetadata. We can
> >> get
> >>> it
> >>>>> through the ThreadMetadata with a minor change to
> >> `localThreadMetadata`
> >>>> in
> >>>>> kafkaStreams. This means that we will only need to update
> >> TaskMetadata
> >>>> and
> >>>>> add no other APIs
> >>>>>
> >>>>> Boyang, since each TaskMetadata contains the TaskId and
> >>> TopicPartitions I
> >>>>> don't believe mapping either way will be a problem. Also I think we
> >> can
> >>>> do
> >>>>> something like record the time the task started idling and when it
> >>> stops
> >>>>> idling we can override it to -1. I think that should clear up the
> >> first
> >>>> two
> >>>>> points.
> >>>>>
> >>>>> As for your third point I am not sure I 100% understand. The
> >>>> ThreadMetadata
> >>>>> will contain a set of all task assigned to that thread. Any health
> >>> check
> >>>>> service will just need to query all clients and aggregate their
> >>> responses
> >>>>> to get a complete picture of all tasks correct?
> >>>>>
> >>>>> walker
> >>>>>
> >>>>> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wa...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Regarding the second API and the `TaskStatus` class: I'd suggest we
> >>>>>> consolidate on the existing `TaskMetadata` since we have already
> >>>>>> accumulated a bunch of such classes, and its better to keep them
> >>> small
> >>>> as
> >>>>>> public APIs. You can see
> >>>>> https://issues.apache.org/jira/browse/KAFKA-12370
> >>>>>> for a reference and a proposal.
> >>>>>>
> >>>>>> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
> >>>> reluctanthero104@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for the updates Walker. Some replies and follow-up
> >>> questions:
> >>>>>>>
> >>>>>>> 1. I agree one task could have multiple partitions, but when we
> >>> hit a
> >>>>>> delay
> >>>>>>> in terms of offset progress, do we have a convenient way to
> >> reverse
> >>>>>> mapping
> >>>>>>> TopicPartition to the problematic task? In production, I believe
> >> it
> >>>>> would
> >>>>>>> be much quicker to identify the problem using task.id instead of
> >>>> topic
> >>>>>>> partition, especially when it points to an internal topic. I
> >> think
> >>>>> having
> >>>>>>> the task id as part of the entry value seems useful, which means
> >>>>> getting
> >>>>>>> something like Map<TopicPartition, TaskProgress> where
> >> TaskProgress
> >>>>>>> contains both committed offsets & task id.
> >>>>>>>
> >>>>>>> 2. The task idling API was still confusing. I don't think we care
> >>>> about
> >>>>>> the
> >>>>>>> exact state when making tasksIdling()query, instead we care more
> >>>> about
> >>>>>> how
> >>>>>>> long one task has been in idle state since when you called, which
> >>>>>> reflects
> >>>>>>> whether it is a normal idling period. So I feel it might be
> >> helpful
> >>>> to
> >>>>>>> track that time difference and report it in the TaskStatus
> >> struct.
> >>>>>>>
> >>>>>>> 3. What I want to achieve to have some global mapping of either
> >>>>>>> TopicPartition or TaskId was that it is not possible for a health
> >>>> check
> >>>>>>> service to report a task failure that doesn't emit any metrics.
> >> So
> >>> as
> >>>>>> long
> >>>>>>> as we have a global topic partition API, health check could
> >> always
> >>> be
> >>>>>> aware
> >>>>>>> of any task/partition not reporting its progress, does that make
> >>>> sense?
> >>>>>> If
> >>>>>>> you feel we have a better way to achieve this, such as querying
> >> all
> >>>> the
> >>>>>>> input/intermediate topic metadata directly from Kafka for the
> >>>>> baseline, I
> >>>>>>> think that should be good as well and worth mentioning it in the
> >>> KIP.
> >>>>>>>
> >>>>>>> Also it seems that the KIP hasn't reflected what you proposed for
> >>> the
> >>>>>> task
> >>>>>>> idling status.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Boyang
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
> >>>> wcarlson@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thank you for the comments everyone!
> >>>>>>>>
> >>>>>>>> I think there are a few things I can clear up in general then I
> >>>> will
> >>>>>>>> specifically respond to each question.
> >>>>>>>>
> >>>>>>>> First, when I say "idling" I refer to task idling. Where the
> >>> stream
> >>>>> is
> >>>>>>>> intentionally not making progress. (
> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10091 is an
> >>> example).
> >>>>> This
> >>>>>>>> becomes relevant if a task is waiting on one partition with no
> >>> data
> >>>>> but
> >>>>>>>> that is holding up a partition with data. That would cause one
> >>> just
> >>>>>>> looking
> >>>>>>>> at the committed offset changes to believe the task has a
> >> problem
> >>>>> when
> >>>>>> it
> >>>>>>>> is working as intended.
> >>>>>>>>
> >>>>>>>> In light of this confusion. I plan to change tasksIdling() to
> >>>>>>> `Map<TaskId,
> >>>>>>>> TaskStatus> getTasksStatus()` this should hopefully make it
> >> more
> >>>>> clear
> >>>>>>> what
> >>>>>>>> is being exposed.
> >>>>>>>>
> >>>>>>>> TaskStatus would include: TopicPartions, TaskId,
> >>> ProcessorTopology,
> >>>>>>> Idling,
> >>>>>>>> and State.
> >>>>>>>>
> >>>>>>>> Boyang:
> >>>>>>>>
> >>>>>>>> 2) I think that each task should report on whatever
> >>> TopicPartitions
> >>>>>> they
> >>>>>>>> hold, this means a Topic Partition might get reported twice but
> >>> the
> >>>>>> user
> >>>>>>>> can roll those up and use the larger one when looking at the
> >>> whole
> >>>>> app.
> >>>>>>>>
> >>>>>>>> 4) If the user collects the committed offsets across all the
> >>>> running
> >>>>>>>> clients there shouldn't be any tasks missing correct?
> >>>>>>>>
> >>>>>>>> 6) Because there is not a 1:1 mapping between Tasks and
> >>>>>> TopicPartitions I
> >>>>>>>> think it is cleaner to report them separately.
> >>>>>>>>
> >>>>>>>> Guozhang:
> >>>>>>>>
> >>>>>>>> 1) Yes, that was my original plan but it made more sense to
> >>> mirror
> >>>>> how
> >>>>>>> the
> >>>>>>>> consumer exposes the committed offset.
> >>>>>>>>
> >>>>>>>> 3) That is a good point. I think that we should include
> >> internal
> >>>>> topics
> >>>>>>> as
> >>>>>>>> well. I think that if the topology were to evolve there should
> >> be
> >>>>> fair
> >>>>>>>> warning anyways. Maybe you can clarify what would be limited by
> >>>>>> exposing
> >>>>>>>> the interior topics here? I thought a user could find them in
> >>> other
> >>>>>> ways.
> >>>>>>>> If it is the name we could aynomise them before exposing them.
> >>>>>>>>
> >>>>>>>> Thank you all for your comments. If I did not respond directly
> >> to
> >>>> one
> >>>>>> of
> >>>>>>>> your questions I updated the kip to include the details it was
> >>>>>>> requesting.
> >>>>>>>> I didn't not include my proposed changes mentioned earlier as I
> >>>> would
> >>>>>>> like
> >>>>>>>> to get some feedback about what to include in TaskStatus and in
> >>>>>> general.
> >>>>>>>>
> >>>>>>>> best,
> >>>>>>>> Walker
> >>>>>>>>
> >>>>>>>> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
> >>> wangguoz@gmail.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hello Walker, thanks for the KIP. A few thoughts:
> >>>>>>>>>
> >>>>>>>>> 1) Have you considered just relying on the
> >>>> `KafkaStreams#metrics()`
> >>>>>>> that
> >>>>>>>>> includes embedded consumer metrics that have the committed
> >>>> offsets
> >>>>>>>>> instead of adding a new API? Not advocating that this is a
> >>> better
> >>>>>>>> approach
> >>>>>>>>> but want to make sure we considered all options before we
> >> come
> >>> to
> >>>>> the
> >>>>>>>> "last
> >>>>>>>>> resort" of adding new public interfaces.
> >>>>>>>>>
> >>>>>>>>> 2) The javadoc mentions "tasks assigned to this client", but
> >>> the
> >>>>>>> returned
> >>>>>>>>> map is on partitions. I think we should make the javadoc and
> >>> the
> >>>>>> return
> >>>>>>>>> types consistent, either tasks or topic partitions.
> >>>>>>>>>
> >>>>>>>>> 3) In addition, if for 2) above we ended up with topic
> >>>> partitions,
> >>>>>> then
> >>>>>>>>> would they include only external source topics, or also
> >>> including
> >>>>>>>> internal
> >>>>>>>>> repartition / changelog topics? I think including only
> >> external
> >>>>>> source
> >>>>>>>>> topic partitions are not sufficient for your goal of tracking
> >>>>>> progress,
> >>>>>>>> but
> >>>>>>>>> exposing internal topic names are also a big commitment here
> >>> for
> >>>>>> future
> >>>>>>>>> topology evolution.
> >>>>>>>>>
> >>>>>>>>> 4)  For "tasksIdling", I'm wondering if we can make it more
> >>>>> general,
> >>>>>>> that
> >>>>>>>>> the returned value is not just a boolean, but a TaskState
> >> that
> >>>> can
> >>>>> be
> >>>>>>> an
> >>>>>>>>> enum of "created, restoring, running, idle, closing". This
> >>> could
> >>>>> help
> >>>>>>> us
> >>>>>>>> in
> >>>>>>>>> the future to track other things like restoration efficiency
> >>> and
> >>>>>>>> rebalance
> >>>>>>>>> efficiency etc.
> >>>>>>>>>
> >>>>>>>>> 5) We need to clarify how is "idling" being defined here:
> >> e.g.
> >>> we
> >>>>> can
> >>>>>>>>> clearly state that a task is considered idle only if 1) lag
> >> is
> >>>>>>>>> increasing, indicating that there are indeed new records
> >>> arrived
> >>>> at
> >>>>>>>> source,
> >>>>>>>>> while committed offset is not advancing, AND 2) produced
> >> offset
> >>>>>>> (imagine
> >>>>>>>> we
> >>>>>>>>> may have punctuations that generate new data to the output
> >>> topic
> >>>>> even
> >>>>>>> if
> >>>>>>>>> there's no input for a while) is not advancing either.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> >>>>>>> reluctanthero104@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks Walker for the proposed KIP! This should definitely
> >>>>> empower
> >>>>>>>>> KStream
> >>>>>>>>>> users with better visibility.
> >>>>>>>>>>
> >>>>>>>>>> Meanwhile I got a couple of questions/suggestions:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 1. typo "repost/report" in the motivation section.
> >>>>>>>>>>
> >>>>>>>>>> 2. What offsets do we report when the task is under
> >>> restoration
> >>>>> or
> >>>>>>>>>> rebalancing?
> >>>>>>>>>>
> >>>>>>>>>> 3. IIUC, we should clearly state that our reported metrics
> >>> are
> >>>>>> based
> >>>>>>>> off
> >>>>>>>>>> locally assigned tasks for each instance.
> >>>>>>>>>>
> >>>>>>>>>> 4. In the meantime, what’s our strategy to report tasks
> >> that
> >>>> are
> >>>>>> not
> >>>>>>>>> local
> >>>>>>>>>> to the instance? Users would normally try to monitor all
> >> the
> >>>>>> possible
> >>>>>>>>>> tasks, and it’s unfortunate we couldn’t determine whether
> >> we
> >>>> have
> >>>>>>> lost
> >>>>>>>>>> tasks. My brainstorming was whether it makes sense for the
> >>>> leader
> >>>>>>>>> instance
> >>>>>>>>>> to report the task progress as -1 for all “supposed to be
> >>>>> running”
> >>>>>>>> tasks,
> >>>>>>>>>> so that on the metrics collector side it could catch any
> >>>> missing
> >>>>>>> tasks.
> >>>>>>>>>>
> >>>>>>>>>> 5. It seems not clear how users should use `isTaskIdling`.
> >>> Why
> >>>>> not
> >>>>>>>>> report a
> >>>>>>>>>> map/set for idling tasks just as what we did for committed
> >>>>> offsets?
> >>>>>>>>>>
> >>>>>>>>>> 6. Why do we use TopicPartition instead of TaskId as the
> >> key
> >>> in
> >>>>> the
> >>>>>>>>>> returned map?
> >>>>>>>>>> 7. Could we include some details in where we got the commit
> >>>>> offsets
> >>>>>>> for
> >>>>>>>>>> each task? Is it through consumer offset fetch, or the
> >> stream
> >>>>>>>> processing
> >>>>>>>>>> progress based on the records fetched?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> >>>>>>> wcarlson@confluent.io>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hello all,
> >>>>>>>>>>>
> >>>>>>>>>>> I would like to start discussion on KIP-715. This kip
> >> aims
> >>> to
> >>>>>> make
> >>>>>>> it
> >>>>>>>>>>> easier to monitor Kafka Streams progress by exposing the
> >>>>>> committed
> >>>>>>>>> offset
> >>>>>>>>>>> in a similar way as the consumer client does.
> >>>>>>>>>>>
> >>>>>>>>>>> Here is the KIP:
> >>>> https://cwiki.apache.org/confluence/x/aRRRCg
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Walker
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by "Matthias J. Sax" <mj...@apache.org>.
Thanks the updating the KIP Walker.

About, `timeCurrentIdlingStarted()`: should we return an `Optional`
instead of `-1` if a task is not idling.


As we allow to call `localThreadMetadata()` any time, could it be that
we report partial information during a rebalance? If yes, this should be
pointed out, because if one want to implement a health check this needs
to be taken into account.

-Matthias


On 2/27/21 11:32 AM, Walker Carlson wrote:
> Sure thing Boyang,
> 
> 1) it is in proposed changes. I expanded on it a bit more now.
> 2) done
> 3) and done :)
> 
> thanks for the suggestions,
> walker
> 
> On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <re...@gmail.com>
> wrote:
> 
>> Thanks Walker. Some minor comments:
>>
>> 1. Could you add a reference to localThreadMetadata method in the KIP?
>> 2. Could you make the code block as a java template, such that
>> TaskMetadata.java could be as the template title? Also it would be good to
>> add some meta comments about the newly added functions.
>> 3. Could you write more details about rejected alternatives? Just as why we
>> don't choose to expose as metrics, and how a new method on KStream is not
>> favorable. These would be valuable when we look back on our design
>> decisions.
>>
>> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <wc...@confluent.io>
>> wrote:
>>
>>> I understand now. I think that is a valid concern but I think it is best
>>> solved but having an external service verify through streams. As this KIP
>>> is now just adding fields to TaskMetadata to be returned in the
>>> threadMetadata I am going to say that is out of scope.
>>>
>>> That seems to be the last concern. If there are no others I will put this
>>> up for a vote soon.
>>>
>>> walker
>>>
>>> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <reluctanthero104@gmail.com
>>>
>>> wrote:
>>>
>>>> For the 3rd point, yes, what I'm proposing is an edge case. For
>> example,
>>>> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing
>> logic
>>>> causing no one gets 1_1 assigned. Then the health check service will
>> only
>>>> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
>> paying
>>>> attention to 1_1. What I want to expose is a "logical global" view of
>> all
>>>> the tasks through the stream instance, since each instance gets the
>>>> assigned topology and should be able to infer all the exact tasks to be
>>> up
>>>> and running when the service is healthy.
>>>>
>>>> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <wcarlson@confluent.io
>>>
>>>> wrote:
>>>>
>>>>> Thanks for the follow up Boyang and Guozhang,
>>>>>
>>>>> I have updated the kip to include these ideas.
>>>>>
>>>>> Guozhang, that is a good idea about using the TaskMetadata. We can
>> get
>>> it
>>>>> through the ThreadMetadata with a minor change to
>> `localThreadMetadata`
>>>> in
>>>>> kafkaStreams. This means that we will only need to update
>> TaskMetadata
>>>> and
>>>>> add no other APIs
>>>>>
>>>>> Boyang, since each TaskMetadata contains the TaskId and
>>> TopicPartitions I
>>>>> don't believe mapping either way will be a problem. Also I think we
>> can
>>>> do
>>>>> something like record the time the task started idling and when it
>>> stops
>>>>> idling we can override it to -1. I think that should clear up the
>> first
>>>> two
>>>>> points.
>>>>>
>>>>> As for your third point I am not sure I 100% understand. The
>>>> ThreadMetadata
>>>>> will contain a set of all task assigned to that thread. Any health
>>> check
>>>>> service will just need to query all clients and aggregate their
>>> responses
>>>>> to get a complete picture of all tasks correct?
>>>>>
>>>>> walker
>>>>>
>>>>> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wa...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Regarding the second API and the `TaskStatus` class: I'd suggest we
>>>>>> consolidate on the existing `TaskMetadata` since we have already
>>>>>> accumulated a bunch of such classes, and its better to keep them
>>> small
>>>> as
>>>>>> public APIs. You can see
>>>>> https://issues.apache.org/jira/browse/KAFKA-12370
>>>>>> for a reference and a proposal.
>>>>>>
>>>>>> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
>>>> reluctanthero104@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the updates Walker. Some replies and follow-up
>>> questions:
>>>>>>>
>>>>>>> 1. I agree one task could have multiple partitions, but when we
>>> hit a
>>>>>> delay
>>>>>>> in terms of offset progress, do we have a convenient way to
>> reverse
>>>>>> mapping
>>>>>>> TopicPartition to the problematic task? In production, I believe
>> it
>>>>> would
>>>>>>> be much quicker to identify the problem using task.id instead of
>>>> topic
>>>>>>> partition, especially when it points to an internal topic. I
>> think
>>>>> having
>>>>>>> the task id as part of the entry value seems useful, which means
>>>>> getting
>>>>>>> something like Map<TopicPartition, TaskProgress> where
>> TaskProgress
>>>>>>> contains both committed offsets & task id.
>>>>>>>
>>>>>>> 2. The task idling API was still confusing. I don't think we care
>>>> about
>>>>>> the
>>>>>>> exact state when making tasksIdling()query, instead we care more
>>>> about
>>>>>> how
>>>>>>> long one task has been in idle state since when you called, which
>>>>>> reflects
>>>>>>> whether it is a normal idling period. So I feel it might be
>> helpful
>>>> to
>>>>>>> track that time difference and report it in the TaskStatus
>> struct.
>>>>>>>
>>>>>>> 3. What I want to achieve to have some global mapping of either
>>>>>>> TopicPartition or TaskId was that it is not possible for a health
>>>> check
>>>>>>> service to report a task failure that doesn't emit any metrics.
>> So
>>> as
>>>>>> long
>>>>>>> as we have a global topic partition API, health check could
>> always
>>> be
>>>>>> aware
>>>>>>> of any task/partition not reporting its progress, does that make
>>>> sense?
>>>>>> If
>>>>>>> you feel we have a better way to achieve this, such as querying
>> all
>>>> the
>>>>>>> input/intermediate topic metadata directly from Kafka for the
>>>>> baseline, I
>>>>>>> think that should be good as well and worth mentioning it in the
>>> KIP.
>>>>>>>
>>>>>>> Also it seems that the KIP hasn't reflected what you proposed for
>>> the
>>>>>> task
>>>>>>> idling status.
>>>>>>>
>>>>>>> Best,
>>>>>>> Boyang
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
>>>> wcarlson@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thank you for the comments everyone!
>>>>>>>>
>>>>>>>> I think there are a few things I can clear up in general then I
>>>> will
>>>>>>>> specifically respond to each question.
>>>>>>>>
>>>>>>>> First, when I say "idling" I refer to task idling. Where the
>>> stream
>>>>> is
>>>>>>>> intentionally not making progress. (
>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10091 is an
>>> example).
>>>>> This
>>>>>>>> becomes relevant if a task is waiting on one partition with no
>>> data
>>>>> but
>>>>>>>> that is holding up a partition with data. That would cause one
>>> just
>>>>>>> looking
>>>>>>>> at the committed offset changes to believe the task has a
>> problem
>>>>> when
>>>>>> it
>>>>>>>> is working as intended.
>>>>>>>>
>>>>>>>> In light of this confusion. I plan to change tasksIdling() to
>>>>>>> `Map<TaskId,
>>>>>>>> TaskStatus> getTasksStatus()` this should hopefully make it
>> more
>>>>> clear
>>>>>>> what
>>>>>>>> is being exposed.
>>>>>>>>
>>>>>>>> TaskStatus would include: TopicPartions, TaskId,
>>> ProcessorTopology,
>>>>>>> Idling,
>>>>>>>> and State.
>>>>>>>>
>>>>>>>> Boyang:
>>>>>>>>
>>>>>>>> 2) I think that each task should report on whatever
>>> TopicPartitions
>>>>>> they
>>>>>>>> hold, this means a Topic Partition might get reported twice but
>>> the
>>>>>> user
>>>>>>>> can roll those up and use the larger one when looking at the
>>> whole
>>>>> app.
>>>>>>>>
>>>>>>>> 4) If the user collects the committed offsets across all the
>>>> running
>>>>>>>> clients there shouldn't be any tasks missing correct?
>>>>>>>>
>>>>>>>> 6) Because there is not a 1:1 mapping between Tasks and
>>>>>> TopicPartitions I
>>>>>>>> think it is cleaner to report them separately.
>>>>>>>>
>>>>>>>> Guozhang:
>>>>>>>>
>>>>>>>> 1) Yes, that was my original plan but it made more sense to
>>> mirror
>>>>> how
>>>>>>> the
>>>>>>>> consumer exposes the committed offset.
>>>>>>>>
>>>>>>>> 3) That is a good point. I think that we should include
>> internal
>>>>> topics
>>>>>>> as
>>>>>>>> well. I think that if the topology were to evolve there should
>> be
>>>>> fair
>>>>>>>> warning anyways. Maybe you can clarify what would be limited by
>>>>>> exposing
>>>>>>>> the interior topics here? I thought a user could find them in
>>> other
>>>>>> ways.
>>>>>>>> If it is the name we could aynomise them before exposing them.
>>>>>>>>
>>>>>>>> Thank you all for your comments. If I did not respond directly
>> to
>>>> one
>>>>>> of
>>>>>>>> your questions I updated the kip to include the details it was
>>>>>>> requesting.
>>>>>>>> I didn't not include my proposed changes mentioned earlier as I
>>>> would
>>>>>>> like
>>>>>>>> to get some feedback about what to include in TaskStatus and in
>>>>>> general.
>>>>>>>>
>>>>>>>> best,
>>>>>>>> Walker
>>>>>>>>
>>>>>>>> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
>>> wangguoz@gmail.com
>>>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello Walker, thanks for the KIP. A few thoughts:
>>>>>>>>>
>>>>>>>>> 1) Have you considered just relying on the
>>>> `KafkaStreams#metrics()`
>>>>>>> that
>>>>>>>>> includes embedded consumer metrics that have the committed
>>>> offsets
>>>>>>>>> instead of adding a new API? Not advocating that this is a
>>> better
>>>>>>>> approach
>>>>>>>>> but want to make sure we considered all options before we
>> come
>>> to
>>>>> the
>>>>>>>> "last
>>>>>>>>> resort" of adding new public interfaces.
>>>>>>>>>
>>>>>>>>> 2) The javadoc mentions "tasks assigned to this client", but
>>> the
>>>>>>> returned
>>>>>>>>> map is on partitions. I think we should make the javadoc and
>>> the
>>>>>> return
>>>>>>>>> types consistent, either tasks or topic partitions.
>>>>>>>>>
>>>>>>>>> 3) In addition, if for 2) above we ended up with topic
>>>> partitions,
>>>>>> then
>>>>>>>>> would they include only external source topics, or also
>>> including
>>>>>>>> internal
>>>>>>>>> repartition / changelog topics? I think including only
>> external
>>>>>> source
>>>>>>>>> topic partitions are not sufficient for your goal of tracking
>>>>>> progress,
>>>>>>>> but
>>>>>>>>> exposing internal topic names are also a big commitment here
>>> for
>>>>>> future
>>>>>>>>> topology evolution.
>>>>>>>>>
>>>>>>>>> 4)  For "tasksIdling", I'm wondering if we can make it more
>>>>> general,
>>>>>>> that
>>>>>>>>> the returned value is not just a boolean, but a TaskState
>> that
>>>> can
>>>>> be
>>>>>>> an
>>>>>>>>> enum of "created, restoring, running, idle, closing". This
>>> could
>>>>> help
>>>>>>> us
>>>>>>>> in
>>>>>>>>> the future to track other things like restoration efficiency
>>> and
>>>>>>>> rebalance
>>>>>>>>> efficiency etc.
>>>>>>>>>
>>>>>>>>> 5) We need to clarify how is "idling" being defined here:
>> e.g.
>>> we
>>>>> can
>>>>>>>>> clearly state that a task is considered idle only if 1) lag
>> is
>>>>>>>>> increasing, indicating that there are indeed new records
>>> arrived
>>>> at
>>>>>>>> source,
>>>>>>>>> while committed offset is not advancing, AND 2) produced
>> offset
>>>>>>> (imagine
>>>>>>>> we
>>>>>>>>> may have punctuations that generate new data to the output
>>> topic
>>>>> even
>>>>>>> if
>>>>>>>>> there's no input for a while) is not advancing either.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
>>>>>>> reluctanthero104@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Walker for the proposed KIP! This should definitely
>>>>> empower
>>>>>>>>> KStream
>>>>>>>>>> users with better visibility.
>>>>>>>>>>
>>>>>>>>>> Meanwhile I got a couple of questions/suggestions:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 1. typo "repost/report" in the motivation section.
>>>>>>>>>>
>>>>>>>>>> 2. What offsets do we report when the task is under
>>> restoration
>>>>> or
>>>>>>>>>> rebalancing?
>>>>>>>>>>
>>>>>>>>>> 3. IIUC, we should clearly state that our reported metrics
>>> are
>>>>>> based
>>>>>>>> off
>>>>>>>>>> locally assigned tasks for each instance.
>>>>>>>>>>
>>>>>>>>>> 4. In the meantime, what’s our strategy to report tasks
>> that
>>>> are
>>>>>> not
>>>>>>>>> local
>>>>>>>>>> to the instance? Users would normally try to monitor all
>> the
>>>>>> possible
>>>>>>>>>> tasks, and it’s unfortunate we couldn’t determine whether
>> we
>>>> have
>>>>>>> lost
>>>>>>>>>> tasks. My brainstorming was whether it makes sense for the
>>>> leader
>>>>>>>>> instance
>>>>>>>>>> to report the task progress as -1 for all “supposed to be
>>>>> running”
>>>>>>>> tasks,
>>>>>>>>>> so that on the metrics collector side it could catch any
>>>> missing
>>>>>>> tasks.
>>>>>>>>>>
>>>>>>>>>> 5. It seems not clear how users should use `isTaskIdling`.
>>> Why
>>>>> not
>>>>>>>>> report a
>>>>>>>>>> map/set for idling tasks just as what we did for committed
>>>>> offsets?
>>>>>>>>>>
>>>>>>>>>> 6. Why do we use TopicPartition instead of TaskId as the
>> key
>>> in
>>>>> the
>>>>>>>>>> returned map?
>>>>>>>>>> 7. Could we include some details in where we got the commit
>>>>> offsets
>>>>>>> for
>>>>>>>>>> each task? Is it through consumer offset fetch, or the
>> stream
>>>>>>>> processing
>>>>>>>>>> progress based on the records fetched?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
>>>>>>> wcarlson@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello all,
>>>>>>>>>>>
>>>>>>>>>>> I would like to start discussion on KIP-715. This kip
>> aims
>>> to
>>>>>> make
>>>>>>> it
>>>>>>>>>>> easier to monitor Kafka Streams progress by exposing the
>>>>>> committed
>>>>>>>>> offset
>>>>>>>>>>> in a similar way as the consumer client does.
>>>>>>>>>>>
>>>>>>>>>>> Here is the KIP:
>>>> https://cwiki.apache.org/confluence/x/aRRRCg
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Walker
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Walker Carlson <wc...@confluent.io>.
Sure thing Boyang,

1) it is in proposed changes. I expanded on it a bit more now.
2) done
3) and done :)

thanks for the suggestions,
walker

On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <re...@gmail.com>
wrote:

> Thanks Walker. Some minor comments:
>
> 1. Could you add a reference to localThreadMetadata method in the KIP?
> 2. Could you make the code block as a java template, such that
> TaskMetadata.java could be as the template title? Also it would be good to
> add some meta comments about the newly added functions.
> 3. Could you write more details about rejected alternatives? Just as why we
> don't choose to expose as metrics, and how a new method on KStream is not
> favorable. These would be valuable when we look back on our design
> decisions.
>
> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <wc...@confluent.io>
> wrote:
>
> > I understand now. I think that is a valid concern but I think it is best
> > solved but having an external service verify through streams. As this KIP
> > is now just adding fields to TaskMetadata to be returned in the
> > threadMetadata I am going to say that is out of scope.
> >
> > That seems to be the last concern. If there are no others I will put this
> > up for a vote soon.
> >
> > walker
> >
> > On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <reluctanthero104@gmail.com
> >
> > wrote:
> >
> > > For the 3rd point, yes, what I'm proposing is an edge case. For
> example,
> > > when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing
> logic
> > > causing no one gets 1_1 assigned. Then the health check service will
> only
> > > see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
> paying
> > > attention to 1_1. What I want to expose is a "logical global" view of
> all
> > > the tasks through the stream instance, since each instance gets the
> > > assigned topology and should be able to infer all the exact tasks to be
> > up
> > > and running when the service is healthy.
> > >
> > > On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <wcarlson@confluent.io
> >
> > > wrote:
> > >
> > > > Thanks for the follow up Boyang and Guozhang,
> > > >
> > > > I have updated the kip to include these ideas.
> > > >
> > > > Guozhang, that is a good idea about using the TaskMetadata. We can
> get
> > it
> > > > through the ThreadMetadata with a minor change to
> `localThreadMetadata`
> > > in
> > > > kafkaStreams. This means that we will only need to update
> TaskMetadata
> > > and
> > > > add no other APIs
> > > >
> > > > Boyang, since each TaskMetadata contains the TaskId and
> > TopicPartitions I
> > > > don't believe mapping either way will be a problem. Also I think we
> can
> > > do
> > > > something like record the time the task started idling and when it
> > stops
> > > > idling we can override it to -1. I think that should clear up the
> first
> > > two
> > > > points.
> > > >
> > > > As for your third point I am not sure I 100% understand. The
> > > ThreadMetadata
> > > > will contain a set of all task assigned to that thread. Any health
> > check
> > > > service will just need to query all clients and aggregate their
> > responses
> > > > to get a complete picture of all tasks correct?
> > > >
> > > > walker
> > > >
> > > > On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Regarding the second API and the `TaskStatus` class: I'd suggest we
> > > > > consolidate on the existing `TaskMetadata` since we have already
> > > > > accumulated a bunch of such classes, and its better to keep them
> > small
> > > as
> > > > > public APIs. You can see
> > > > https://issues.apache.org/jira/browse/KAFKA-12370
> > > > > for a reference and a proposal.
> > > > >
> > > > > On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
> > > reluctanthero104@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thanks for the updates Walker. Some replies and follow-up
> > questions:
> > > > > >
> > > > > > 1. I agree one task could have multiple partitions, but when we
> > hit a
> > > > > delay
> > > > > > in terms of offset progress, do we have a convenient way to
> reverse
> > > > > mapping
> > > > > > TopicPartition to the problematic task? In production, I believe
> it
> > > > would
> > > > > > be much quicker to identify the problem using task.id instead of
> > > topic
> > > > > > partition, especially when it points to an internal topic. I
> think
> > > > having
> > > > > > the task id as part of the entry value seems useful, which means
> > > > getting
> > > > > > something like Map<TopicPartition, TaskProgress> where
> TaskProgress
> > > > > > contains both committed offsets & task id.
> > > > > >
> > > > > > 2. The task idling API was still confusing. I don't think we care
> > > about
> > > > > the
> > > > > > exact state when making tasksIdling()query, instead we care more
> > > about
> > > > > how
> > > > > > long one task has been in idle state since when you called, which
> > > > > reflects
> > > > > > whether it is a normal idling period. So I feel it might be
> helpful
> > > to
> > > > > > track that time difference and report it in the TaskStatus
> struct.
> > > > > >
> > > > > > 3. What I want to achieve to have some global mapping of either
> > > > > > TopicPartition or TaskId was that it is not possible for a health
> > > check
> > > > > > service to report a task failure that doesn't emit any metrics.
> So
> > as
> > > > > long
> > > > > > as we have a global topic partition API, health check could
> always
> > be
> > > > > aware
> > > > > > of any task/partition not reporting its progress, does that make
> > > sense?
> > > > > If
> > > > > > you feel we have a better way to achieve this, such as querying
> all
> > > the
> > > > > > input/intermediate topic metadata directly from Kafka for the
> > > > baseline, I
> > > > > > think that should be good as well and worth mentioning it in the
> > KIP.
> > > > > >
> > > > > > Also it seems that the KIP hasn't reflected what you proposed for
> > the
> > > > > task
> > > > > > idling status.
> > > > > >
> > > > > > Best,
> > > > > > Boyang
> > > > > >
> > > > > >
> > > > > > On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
> > > wcarlson@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Thank you for the comments everyone!
> > > > > > >
> > > > > > > I think there are a few things I can clear up in general then I
> > > will
> > > > > > > specifically respond to each question.
> > > > > > >
> > > > > > > First, when I say "idling" I refer to task idling. Where the
> > stream
> > > > is
> > > > > > > intentionally not making progress. (
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-10091 is an
> > example).
> > > > This
> > > > > > > becomes relevant if a task is waiting on one partition with no
> > data
> > > > but
> > > > > > > that is holding up a partition with data. That would cause one
> > just
> > > > > > looking
> > > > > > > at the committed offset changes to believe the task has a
> problem
> > > > when
> > > > > it
> > > > > > > is working as intended.
> > > > > > >
> > > > > > > In light of this confusion. I plan to change tasksIdling() to
> > > > > > `Map<TaskId,
> > > > > > > TaskStatus> getTasksStatus()` this should hopefully make it
> more
> > > > clear
> > > > > > what
> > > > > > > is being exposed.
> > > > > > >
> > > > > > > TaskStatus would include: TopicPartions, TaskId,
> > ProcessorTopology,
> > > > > > Idling,
> > > > > > > and State.
> > > > > > >
> > > > > > > Boyang:
> > > > > > >
> > > > > > > 2) I think that each task should report on whatever
> > TopicPartitions
> > > > > they
> > > > > > > hold, this means a Topic Partition might get reported twice but
> > the
> > > > > user
> > > > > > > can roll those up and use the larger one when looking at the
> > whole
> > > > app.
> > > > > > >
> > > > > > > 4) If the user collects the committed offsets across all the
> > > running
> > > > > > > clients there shouldn't be any tasks missing correct?
> > > > > > >
> > > > > > > 6) Because there is not a 1:1 mapping between Tasks and
> > > > > TopicPartitions I
> > > > > > > think it is cleaner to report them separately.
> > > > > > >
> > > > > > > Guozhang:
> > > > > > >
> > > > > > > 1) Yes, that was my original plan but it made more sense to
> > mirror
> > > > how
> > > > > > the
> > > > > > > consumer exposes the committed offset.
> > > > > > >
> > > > > > > 3) That is a good point. I think that we should include
> internal
> > > > topics
> > > > > > as
> > > > > > > well. I think that if the topology were to evolve there should
> be
> > > > fair
> > > > > > > warning anyways. Maybe you can clarify what would be limited by
> > > > > exposing
> > > > > > > the interior topics here? I thought a user could find them in
> > other
> > > > > ways.
> > > > > > > If it is the name we could aynomise them before exposing them.
> > > > > > >
> > > > > > > Thank you all for your comments. If I did not respond directly
> to
> > > one
> > > > > of
> > > > > > > your questions I updated the kip to include the details it was
> > > > > > requesting.
> > > > > > > I didn't not include my proposed changes mentioned earlier as I
> > > would
> > > > > > like
> > > > > > > to get some feedback about what to include in TaskStatus and in
> > > > > general.
> > > > > > >
> > > > > > > best,
> > > > > > > Walker
> > > > > > >
> > > > > > > On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Walker, thanks for the KIP. A few thoughts:
> > > > > > > >
> > > > > > > > 1) Have you considered just relying on the
> > > `KafkaStreams#metrics()`
> > > > > > that
> > > > > > > > includes embedded consumer metrics that have the committed
> > > offsets
> > > > > > > > instead of adding a new API? Not advocating that this is a
> > better
> > > > > > > approach
> > > > > > > > but want to make sure we considered all options before we
> come
> > to
> > > > the
> > > > > > > "last
> > > > > > > > resort" of adding new public interfaces.
> > > > > > > >
> > > > > > > > 2) The javadoc mentions "tasks assigned to this client", but
> > the
> > > > > > returned
> > > > > > > > map is on partitions. I think we should make the javadoc and
> > the
> > > > > return
> > > > > > > > types consistent, either tasks or topic partitions.
> > > > > > > >
> > > > > > > > 3) In addition, if for 2) above we ended up with topic
> > > partitions,
> > > > > then
> > > > > > > > would they include only external source topics, or also
> > including
> > > > > > > internal
> > > > > > > > repartition / changelog topics? I think including only
> external
> > > > > source
> > > > > > > > topic partitions are not sufficient for your goal of tracking
> > > > > progress,
> > > > > > > but
> > > > > > > > exposing internal topic names are also a big commitment here
> > for
> > > > > future
> > > > > > > > topology evolution.
> > > > > > > >
> > > > > > > > 4)  For "tasksIdling", I'm wondering if we can make it more
> > > > general,
> > > > > > that
> > > > > > > > the returned value is not just a boolean, but a TaskState
> that
> > > can
> > > > be
> > > > > > an
> > > > > > > > enum of "created, restoring, running, idle, closing". This
> > could
> > > > help
> > > > > > us
> > > > > > > in
> > > > > > > > the future to track other things like restoration efficiency
> > and
> > > > > > > rebalance
> > > > > > > > efficiency etc.
> > > > > > > >
> > > > > > > > 5) We need to clarify how is "idling" being defined here:
> e.g.
> > we
> > > > can
> > > > > > > > clearly state that a task is considered idle only if 1) lag
> is
> > > > > > > > increasing, indicating that there are indeed new records
> > arrived
> > > at
> > > > > > > source,
> > > > > > > > while committed offset is not advancing, AND 2) produced
> offset
> > > > > > (imagine
> > > > > > > we
> > > > > > > > may have punctuations that generate new data to the output
> > topic
> > > > even
> > > > > > if
> > > > > > > > there's no input for a while) is not advancing either.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> > > > > > reluctanthero104@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Walker for the proposed KIP! This should definitely
> > > > empower
> > > > > > > > KStream
> > > > > > > > > users with better visibility.
> > > > > > > > >
> > > > > > > > > Meanwhile I got a couple of questions/suggestions:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > 1. typo "repost/report" in the motivation section.
> > > > > > > > >
> > > > > > > > > 2. What offsets do we report when the task is under
> > restoration
> > > > or
> > > > > > > > > rebalancing?
> > > > > > > > >
> > > > > > > > > 3. IIUC, we should clearly state that our reported metrics
> > are
> > > > > based
> > > > > > > off
> > > > > > > > > locally assigned tasks for each instance.
> > > > > > > > >
> > > > > > > > > 4. In the meantime, what’s our strategy to report tasks
> that
> > > are
> > > > > not
> > > > > > > > local
> > > > > > > > > to the instance? Users would normally try to monitor all
> the
> > > > > possible
> > > > > > > > > tasks, and it’s unfortunate we couldn’t determine whether
> we
> > > have
> > > > > > lost
> > > > > > > > > tasks. My brainstorming was whether it makes sense for the
> > > leader
> > > > > > > > instance
> > > > > > > > > to report the task progress as -1 for all “supposed to be
> > > > running”
> > > > > > > tasks,
> > > > > > > > > so that on the metrics collector side it could catch any
> > > missing
> > > > > > tasks.
> > > > > > > > >
> > > > > > > > > 5. It seems not clear how users should use `isTaskIdling`.
> > Why
> > > > not
> > > > > > > > report a
> > > > > > > > > map/set for idling tasks just as what we did for committed
> > > > offsets?
> > > > > > > > >
> > > > > > > > > 6. Why do we use TopicPartition instead of TaskId as the
> key
> > in
> > > > the
> > > > > > > > > returned map?
> > > > > > > > > 7. Could we include some details in where we got the commit
> > > > offsets
> > > > > > for
> > > > > > > > > each task? Is it through consumer offset fetch, or the
> stream
> > > > > > > processing
> > > > > > > > > progress based on the records fetched?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> > > > > > wcarlson@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hello all,
> > > > > > > > > >
> > > > > > > > > > I would like to start discussion on KIP-715. This kip
> aims
> > to
> > > > > make
> > > > > > it
> > > > > > > > > > easier to monitor Kafka Streams progress by exposing the
> > > > > committed
> > > > > > > > offset
> > > > > > > > > > in a similar way as the consumer client does.
> > > > > > > > > >
> > > > > > > > > > Here is the KIP:
> > > https://cwiki.apache.org/confluence/x/aRRRCg
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Walker
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Boyang Chen <re...@gmail.com>.
Thanks Walker. Some minor comments:

1. Could you add a reference to localThreadMetadata method in the KIP?
2. Could you make the code block as a java template, such that
TaskMetadata.java could be as the template title? Also it would be good to
add some meta comments about the newly added functions.
3. Could you write more details about rejected alternatives? Just as why we
don't choose to expose as metrics, and how a new method on KStream is not
favorable. These would be valuable when we look back on our design
decisions.

On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <wc...@confluent.io>
wrote:

> I understand now. I think that is a valid concern but I think it is best
> solved but having an external service verify through streams. As this KIP
> is now just adding fields to TaskMetadata to be returned in the
> threadMetadata I am going to say that is out of scope.
>
> That seems to be the last concern. If there are no others I will put this
> up for a vote soon.
>
> walker
>
> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <re...@gmail.com>
> wrote:
>
> > For the 3rd point, yes, what I'm proposing is an edge case. For example,
> > when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing logic
> > causing no one gets 1_1 assigned. Then the health check service will only
> > see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not paying
> > attention to 1_1. What I want to expose is a "logical global" view of all
> > the tasks through the stream instance, since each instance gets the
> > assigned topology and should be able to infer all the exact tasks to be
> up
> > and running when the service is healthy.
> >
> > On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <wc...@confluent.io>
> > wrote:
> >
> > > Thanks for the follow up Boyang and Guozhang,
> > >
> > > I have updated the kip to include these ideas.
> > >
> > > Guozhang, that is a good idea about using the TaskMetadata. We can get
> it
> > > through the ThreadMetadata with a minor change to `localThreadMetadata`
> > in
> > > kafkaStreams. This means that we will only need to update TaskMetadata
> > and
> > > add no other APIs
> > >
> > > Boyang, since each TaskMetadata contains the TaskId and
> TopicPartitions I
> > > don't believe mapping either way will be a problem. Also I think we can
> > do
> > > something like record the time the task started idling and when it
> stops
> > > idling we can override it to -1. I think that should clear up the first
> > two
> > > points.
> > >
> > > As for your third point I am not sure I 100% understand. The
> > ThreadMetadata
> > > will contain a set of all task assigned to that thread. Any health
> check
> > > service will just need to query all clients and aggregate their
> responses
> > > to get a complete picture of all tasks correct?
> > >
> > > walker
> > >
> > > On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Regarding the second API and the `TaskStatus` class: I'd suggest we
> > > > consolidate on the existing `TaskMetadata` since we have already
> > > > accumulated a bunch of such classes, and its better to keep them
> small
> > as
> > > > public APIs. You can see
> > > https://issues.apache.org/jira/browse/KAFKA-12370
> > > > for a reference and a proposal.
> > > >
> > > > On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
> > reluctanthero104@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks for the updates Walker. Some replies and follow-up
> questions:
> > > > >
> > > > > 1. I agree one task could have multiple partitions, but when we
> hit a
> > > > delay
> > > > > in terms of offset progress, do we have a convenient way to reverse
> > > > mapping
> > > > > TopicPartition to the problematic task? In production, I believe it
> > > would
> > > > > be much quicker to identify the problem using task.id instead of
> > topic
> > > > > partition, especially when it points to an internal topic. I think
> > > having
> > > > > the task id as part of the entry value seems useful, which means
> > > getting
> > > > > something like Map<TopicPartition, TaskProgress> where TaskProgress
> > > > > contains both committed offsets & task id.
> > > > >
> > > > > 2. The task idling API was still confusing. I don't think we care
> > about
> > > > the
> > > > > exact state when making tasksIdling()query, instead we care more
> > about
> > > > how
> > > > > long one task has been in idle state since when you called, which
> > > > reflects
> > > > > whether it is a normal idling period. So I feel it might be helpful
> > to
> > > > > track that time difference and report it in the TaskStatus struct.
> > > > >
> > > > > 3. What I want to achieve to have some global mapping of either
> > > > > TopicPartition or TaskId was that it is not possible for a health
> > check
> > > > > service to report a task failure that doesn't emit any metrics. So
> as
> > > > long
> > > > > as we have a global topic partition API, health check could always
> be
> > > > aware
> > > > > of any task/partition not reporting its progress, does that make
> > sense?
> > > > If
> > > > > you feel we have a better way to achieve this, such as querying all
> > the
> > > > > input/intermediate topic metadata directly from Kafka for the
> > > baseline, I
> > > > > think that should be good as well and worth mentioning it in the
> KIP.
> > > > >
> > > > > Also it seems that the KIP hasn't reflected what you proposed for
> the
> > > > task
> > > > > idling status.
> > > > >
> > > > > Best,
> > > > > Boyang
> > > > >
> > > > >
> > > > > On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
> > wcarlson@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Thank you for the comments everyone!
> > > > > >
> > > > > > I think there are a few things I can clear up in general then I
> > will
> > > > > > specifically respond to each question.
> > > > > >
> > > > > > First, when I say "idling" I refer to task idling. Where the
> stream
> > > is
> > > > > > intentionally not making progress. (
> > > > > > https://issues.apache.org/jira/browse/KAFKA-10091 is an
> example).
> > > This
> > > > > > becomes relevant if a task is waiting on one partition with no
> data
> > > but
> > > > > > that is holding up a partition with data. That would cause one
> just
> > > > > looking
> > > > > > at the committed offset changes to believe the task has a problem
> > > when
> > > > it
> > > > > > is working as intended.
> > > > > >
> > > > > > In light of this confusion. I plan to change tasksIdling() to
> > > > > `Map<TaskId,
> > > > > > TaskStatus> getTasksStatus()` this should hopefully make it more
> > > clear
> > > > > what
> > > > > > is being exposed.
> > > > > >
> > > > > > TaskStatus would include: TopicPartions, TaskId,
> ProcessorTopology,
> > > > > Idling,
> > > > > > and State.
> > > > > >
> > > > > > Boyang:
> > > > > >
> > > > > > 2) I think that each task should report on whatever
> TopicPartitions
> > > > they
> > > > > > hold, this means a Topic Partition might get reported twice but
> the
> > > > user
> > > > > > can roll those up and use the larger one when looking at the
> whole
> > > app.
> > > > > >
> > > > > > 4) If the user collects the committed offsets across all the
> > running
> > > > > > clients there shouldn't be any tasks missing correct?
> > > > > >
> > > > > > 6) Because there is not a 1:1 mapping between Tasks and
> > > > TopicPartitions I
> > > > > > think it is cleaner to report them separately.
> > > > > >
> > > > > > Guozhang:
> > > > > >
> > > > > > 1) Yes, that was my original plan but it made more sense to
> mirror
> > > how
> > > > > the
> > > > > > consumer exposes the committed offset.
> > > > > >
> > > > > > 3) That is a good point. I think that we should include internal
> > > topics
> > > > > as
> > > > > > well. I think that if the topology were to evolve there should be
> > > fair
> > > > > > warning anyways. Maybe you can clarify what would be limited by
> > > > exposing
> > > > > > the interior topics here? I thought a user could find them in
> other
> > > > ways.
> > > > > > If it is the name we could aynomise them before exposing them.
> > > > > >
> > > > > > Thank you all for your comments. If I did not respond directly to
> > one
> > > > of
> > > > > > your questions I updated the kip to include the details it was
> > > > > requesting.
> > > > > > I didn't not include my proposed changes mentioned earlier as I
> > would
> > > > > like
> > > > > > to get some feedback about what to include in TaskStatus and in
> > > > general.
> > > > > >
> > > > > > best,
> > > > > > Walker
> > > > > >
> > > > > > On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hello Walker, thanks for the KIP. A few thoughts:
> > > > > > >
> > > > > > > 1) Have you considered just relying on the
> > `KafkaStreams#metrics()`
> > > > > that
> > > > > > > includes embedded consumer metrics that have the committed
> > offsets
> > > > > > > instead of adding a new API? Not advocating that this is a
> better
> > > > > > approach
> > > > > > > but want to make sure we considered all options before we come
> to
> > > the
> > > > > > "last
> > > > > > > resort" of adding new public interfaces.
> > > > > > >
> > > > > > > 2) The javadoc mentions "tasks assigned to this client", but
> the
> > > > > returned
> > > > > > > map is on partitions. I think we should make the javadoc and
> the
> > > > return
> > > > > > > types consistent, either tasks or topic partitions.
> > > > > > >
> > > > > > > 3) In addition, if for 2) above we ended up with topic
> > partitions,
> > > > then
> > > > > > > would they include only external source topics, or also
> including
> > > > > > internal
> > > > > > > repartition / changelog topics? I think including only external
> > > > source
> > > > > > > topic partitions are not sufficient for your goal of tracking
> > > > progress,
> > > > > > but
> > > > > > > exposing internal topic names are also a big commitment here
> for
> > > > future
> > > > > > > topology evolution.
> > > > > > >
> > > > > > > 4)  For "tasksIdling", I'm wondering if we can make it more
> > > general,
> > > > > that
> > > > > > > the returned value is not just a boolean, but a TaskState that
> > can
> > > be
> > > > > an
> > > > > > > enum of "created, restoring, running, idle, closing". This
> could
> > > help
> > > > > us
> > > > > > in
> > > > > > > the future to track other things like restoration efficiency
> and
> > > > > > rebalance
> > > > > > > efficiency etc.
> > > > > > >
> > > > > > > 5) We need to clarify how is "idling" being defined here: e.g.
> we
> > > can
> > > > > > > clearly state that a task is considered idle only if 1) lag is
> > > > > > > increasing, indicating that there are indeed new records
> arrived
> > at
> > > > > > source,
> > > > > > > while committed offset is not advancing, AND 2) produced offset
> > > > > (imagine
> > > > > > we
> > > > > > > may have punctuations that generate new data to the output
> topic
> > > even
> > > > > if
> > > > > > > there's no input for a while) is not advancing either.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> > > > > reluctanthero104@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Walker for the proposed KIP! This should definitely
> > > empower
> > > > > > > KStream
> > > > > > > > users with better visibility.
> > > > > > > >
> > > > > > > > Meanwhile I got a couple of questions/suggestions:
> > > > > > > >
> > > > > > > >
> > > > > > > > 1. typo "repost/report" in the motivation section.
> > > > > > > >
> > > > > > > > 2. What offsets do we report when the task is under
> restoration
> > > or
> > > > > > > > rebalancing?
> > > > > > > >
> > > > > > > > 3. IIUC, we should clearly state that our reported metrics
> are
> > > > based
> > > > > > off
> > > > > > > > locally assigned tasks for each instance.
> > > > > > > >
> > > > > > > > 4. In the meantime, what’s our strategy to report tasks that
> > are
> > > > not
> > > > > > > local
> > > > > > > > to the instance? Users would normally try to monitor all the
> > > > possible
> > > > > > > > tasks, and it’s unfortunate we couldn’t determine whether we
> > have
> > > > > lost
> > > > > > > > tasks. My brainstorming was whether it makes sense for the
> > leader
> > > > > > > instance
> > > > > > > > to report the task progress as -1 for all “supposed to be
> > > running”
> > > > > > tasks,
> > > > > > > > so that on the metrics collector side it could catch any
> > missing
> > > > > tasks.
> > > > > > > >
> > > > > > > > 5. It seems not clear how users should use `isTaskIdling`.
> Why
> > > not
> > > > > > > report a
> > > > > > > > map/set for idling tasks just as what we did for committed
> > > offsets?
> > > > > > > >
> > > > > > > > 6. Why do we use TopicPartition instead of TaskId as the key
> in
> > > the
> > > > > > > > returned map?
> > > > > > > > 7. Could we include some details in where we got the commit
> > > offsets
> > > > > for
> > > > > > > > each task? Is it through consumer offset fetch, or the stream
> > > > > > processing
> > > > > > > > progress based on the records fetched?
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> > > > > wcarlson@confluent.io>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hello all,
> > > > > > > > >
> > > > > > > > > I would like to start discussion on KIP-715. This kip aims
> to
> > > > make
> > > > > it
> > > > > > > > > easier to monitor Kafka Streams progress by exposing the
> > > > committed
> > > > > > > offset
> > > > > > > > > in a similar way as the consumer client does.
> > > > > > > > >
> > > > > > > > > Here is the KIP:
> > https://cwiki.apache.org/confluence/x/aRRRCg
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Walker
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Walker Carlson <wc...@confluent.io>.
I understand now. I think that is a valid concern but I think it is best
solved but having an external service verify through streams. As this KIP
is now just adding fields to TaskMetadata to be returned in the
threadMetadata I am going to say that is out of scope.

That seems to be the last concern. If there are no others I will put this
up for a vote soon.

walker

On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <re...@gmail.com>
wrote:

> For the 3rd point, yes, what I'm proposing is an edge case. For example,
> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing logic
> causing no one gets 1_1 assigned. Then the health check service will only
> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not paying
> attention to 1_1. What I want to expose is a "logical global" view of all
> the tasks through the stream instance, since each instance gets the
> assigned topology and should be able to infer all the exact tasks to be up
> and running when the service is healthy.
>
> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <wc...@confluent.io>
> wrote:
>
> > Thanks for the follow up Boyang and Guozhang,
> >
> > I have updated the kip to include these ideas.
> >
> > Guozhang, that is a good idea about using the TaskMetadata. We can get it
> > through the ThreadMetadata with a minor change to `localThreadMetadata`
> in
> > kafkaStreams. This means that we will only need to update TaskMetadata
> and
> > add no other APIs
> >
> > Boyang, since each TaskMetadata contains the TaskId and TopicPartitions I
> > don't believe mapping either way will be a problem. Also I think we can
> do
> > something like record the time the task started idling and when it stops
> > idling we can override it to -1. I think that should clear up the first
> two
> > points.
> >
> > As for your third point I am not sure I 100% understand. The
> ThreadMetadata
> > will contain a set of all task assigned to that thread. Any health check
> > service will just need to query all clients and aggregate their responses
> > to get a complete picture of all tasks correct?
> >
> > walker
> >
> > On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Regarding the second API and the `TaskStatus` class: I'd suggest we
> > > consolidate on the existing `TaskMetadata` since we have already
> > > accumulated a bunch of such classes, and its better to keep them small
> as
> > > public APIs. You can see
> > https://issues.apache.org/jira/browse/KAFKA-12370
> > > for a reference and a proposal.
> > >
> > > On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
> reluctanthero104@gmail.com>
> > > wrote:
> > >
> > > > Thanks for the updates Walker. Some replies and follow-up questions:
> > > >
> > > > 1. I agree one task could have multiple partitions, but when we hit a
> > > delay
> > > > in terms of offset progress, do we have a convenient way to reverse
> > > mapping
> > > > TopicPartition to the problematic task? In production, I believe it
> > would
> > > > be much quicker to identify the problem using task.id instead of
> topic
> > > > partition, especially when it points to an internal topic. I think
> > having
> > > > the task id as part of the entry value seems useful, which means
> > getting
> > > > something like Map<TopicPartition, TaskProgress> where TaskProgress
> > > > contains both committed offsets & task id.
> > > >
> > > > 2. The task idling API was still confusing. I don't think we care
> about
> > > the
> > > > exact state when making tasksIdling()query, instead we care more
> about
> > > how
> > > > long one task has been in idle state since when you called, which
> > > reflects
> > > > whether it is a normal idling period. So I feel it might be helpful
> to
> > > > track that time difference and report it in the TaskStatus struct.
> > > >
> > > > 3. What I want to achieve to have some global mapping of either
> > > > TopicPartition or TaskId was that it is not possible for a health
> check
> > > > service to report a task failure that doesn't emit any metrics. So as
> > > long
> > > > as we have a global topic partition API, health check could always be
> > > aware
> > > > of any task/partition not reporting its progress, does that make
> sense?
> > > If
> > > > you feel we have a better way to achieve this, such as querying all
> the
> > > > input/intermediate topic metadata directly from Kafka for the
> > baseline, I
> > > > think that should be good as well and worth mentioning it in the KIP.
> > > >
> > > > Also it seems that the KIP hasn't reflected what you proposed for the
> > > task
> > > > idling status.
> > > >
> > > > Best,
> > > > Boyang
> > > >
> > > >
> > > > On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
> wcarlson@confluent.io>
> > > > wrote:
> > > >
> > > > > Thank you for the comments everyone!
> > > > >
> > > > > I think there are a few things I can clear up in general then I
> will
> > > > > specifically respond to each question.
> > > > >
> > > > > First, when I say "idling" I refer to task idling. Where the stream
> > is
> > > > > intentionally not making progress. (
> > > > > https://issues.apache.org/jira/browse/KAFKA-10091 is an example).
> > This
> > > > > becomes relevant if a task is waiting on one partition with no data
> > but
> > > > > that is holding up a partition with data. That would cause one just
> > > > looking
> > > > > at the committed offset changes to believe the task has a problem
> > when
> > > it
> > > > > is working as intended.
> > > > >
> > > > > In light of this confusion. I plan to change tasksIdling() to
> > > > `Map<TaskId,
> > > > > TaskStatus> getTasksStatus()` this should hopefully make it more
> > clear
> > > > what
> > > > > is being exposed.
> > > > >
> > > > > TaskStatus would include: TopicPartions, TaskId, ProcessorTopology,
> > > > Idling,
> > > > > and State.
> > > > >
> > > > > Boyang:
> > > > >
> > > > > 2) I think that each task should report on whatever TopicPartitions
> > > they
> > > > > hold, this means a Topic Partition might get reported twice but the
> > > user
> > > > > can roll those up and use the larger one when looking at the whole
> > app.
> > > > >
> > > > > 4) If the user collects the committed offsets across all the
> running
> > > > > clients there shouldn't be any tasks missing correct?
> > > > >
> > > > > 6) Because there is not a 1:1 mapping between Tasks and
> > > TopicPartitions I
> > > > > think it is cleaner to report them separately.
> > > > >
> > > > > Guozhang:
> > > > >
> > > > > 1) Yes, that was my original plan but it made more sense to mirror
> > how
> > > > the
> > > > > consumer exposes the committed offset.
> > > > >
> > > > > 3) That is a good point. I think that we should include internal
> > topics
> > > > as
> > > > > well. I think that if the topology were to evolve there should be
> > fair
> > > > > warning anyways. Maybe you can clarify what would be limited by
> > > exposing
> > > > > the interior topics here? I thought a user could find them in other
> > > ways.
> > > > > If it is the name we could aynomise them before exposing them.
> > > > >
> > > > > Thank you all for your comments. If I did not respond directly to
> one
> > > of
> > > > > your questions I updated the kip to include the details it was
> > > > requesting.
> > > > > I didn't not include my proposed changes mentioned earlier as I
> would
> > > > like
> > > > > to get some feedback about what to include in TaskStatus and in
> > > general.
> > > > >
> > > > > best,
> > > > > Walker
> > > > >
> > > > > On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hello Walker, thanks for the KIP. A few thoughts:
> > > > > >
> > > > > > 1) Have you considered just relying on the
> `KafkaStreams#metrics()`
> > > > that
> > > > > > includes embedded consumer metrics that have the committed
> offsets
> > > > > > instead of adding a new API? Not advocating that this is a better
> > > > > approach
> > > > > > but want to make sure we considered all options before we come to
> > the
> > > > > "last
> > > > > > resort" of adding new public interfaces.
> > > > > >
> > > > > > 2) The javadoc mentions "tasks assigned to this client", but the
> > > > returned
> > > > > > map is on partitions. I think we should make the javadoc and the
> > > return
> > > > > > types consistent, either tasks or topic partitions.
> > > > > >
> > > > > > 3) In addition, if for 2) above we ended up with topic
> partitions,
> > > then
> > > > > > would they include only external source topics, or also including
> > > > > internal
> > > > > > repartition / changelog topics? I think including only external
> > > source
> > > > > > topic partitions are not sufficient for your goal of tracking
> > > progress,
> > > > > but
> > > > > > exposing internal topic names are also a big commitment here for
> > > future
> > > > > > topology evolution.
> > > > > >
> > > > > > 4)  For "tasksIdling", I'm wondering if we can make it more
> > general,
> > > > that
> > > > > > the returned value is not just a boolean, but a TaskState that
> can
> > be
> > > > an
> > > > > > enum of "created, restoring, running, idle, closing". This could
> > help
> > > > us
> > > > > in
> > > > > > the future to track other things like restoration efficiency and
> > > > > rebalance
> > > > > > efficiency etc.
> > > > > >
> > > > > > 5) We need to clarify how is "idling" being defined here: e.g. we
> > can
> > > > > > clearly state that a task is considered idle only if 1) lag is
> > > > > > increasing, indicating that there are indeed new records arrived
> at
> > > > > source,
> > > > > > while committed offset is not advancing, AND 2) produced offset
> > > > (imagine
> > > > > we
> > > > > > may have punctuations that generate new data to the output topic
> > even
> > > > if
> > > > > > there's no input for a while) is not advancing either.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> > > > reluctanthero104@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks Walker for the proposed KIP! This should definitely
> > empower
> > > > > > KStream
> > > > > > > users with better visibility.
> > > > > > >
> > > > > > > Meanwhile I got a couple of questions/suggestions:
> > > > > > >
> > > > > > >
> > > > > > > 1. typo "repost/report" in the motivation section.
> > > > > > >
> > > > > > > 2. What offsets do we report when the task is under restoration
> > or
> > > > > > > rebalancing?
> > > > > > >
> > > > > > > 3. IIUC, we should clearly state that our reported metrics are
> > > based
> > > > > off
> > > > > > > locally assigned tasks for each instance.
> > > > > > >
> > > > > > > 4. In the meantime, what’s our strategy to report tasks that
> are
> > > not
> > > > > > local
> > > > > > > to the instance? Users would normally try to monitor all the
> > > possible
> > > > > > > tasks, and it’s unfortunate we couldn’t determine whether we
> have
> > > > lost
> > > > > > > tasks. My brainstorming was whether it makes sense for the
> leader
> > > > > > instance
> > > > > > > to report the task progress as -1 for all “supposed to be
> > running”
> > > > > tasks,
> > > > > > > so that on the metrics collector side it could catch any
> missing
> > > > tasks.
> > > > > > >
> > > > > > > 5. It seems not clear how users should use `isTaskIdling`. Why
> > not
> > > > > > report a
> > > > > > > map/set for idling tasks just as what we did for committed
> > offsets?
> > > > > > >
> > > > > > > 6. Why do we use TopicPartition instead of TaskId as the key in
> > the
> > > > > > > returned map?
> > > > > > > 7. Could we include some details in where we got the commit
> > offsets
> > > > for
> > > > > > > each task? Is it through consumer offset fetch, or the stream
> > > > > processing
> > > > > > > progress based on the records fetched?
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> > > > wcarlson@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hello all,
> > > > > > > >
> > > > > > > > I would like to start discussion on KIP-715. This kip aims to
> > > make
> > > > it
> > > > > > > > easier to monitor Kafka Streams progress by exposing the
> > > committed
> > > > > > offset
> > > > > > > > in a similar way as the consumer client does.
> > > > > > > >
> > > > > > > > Here is the KIP:
> https://cwiki.apache.org/confluence/x/aRRRCg
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Walker
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Boyang Chen <re...@gmail.com>.
For the 3rd point, yes, what I'm proposing is an edge case. For example,
when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing logic
causing no one gets 1_1 assigned. Then the health check service will only
see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not paying
attention to 1_1. What I want to expose is a "logical global" view of all
the tasks through the stream instance, since each instance gets the
assigned topology and should be able to infer all the exact tasks to be up
and running when the service is healthy.

On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <wc...@confluent.io>
wrote:

> Thanks for the follow up Boyang and Guozhang,
>
> I have updated the kip to include these ideas.
>
> Guozhang, that is a good idea about using the TaskMetadata. We can get it
> through the ThreadMetadata with a minor change to `localThreadMetadata` in
> kafkaStreams. This means that we will only need to update TaskMetadata and
> add no other APIs
>
> Boyang, since each TaskMetadata contains the TaskId and TopicPartitions I
> don't believe mapping either way will be a problem. Also I think we can do
> something like record the time the task started idling and when it stops
> idling we can override it to -1. I think that should clear up the first two
> points.
>
> As for your third point I am not sure I 100% understand. The ThreadMetadata
> will contain a set of all task assigned to that thread. Any health check
> service will just need to query all clients and aggregate their responses
> to get a complete picture of all tasks correct?
>
> walker
>
> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Regarding the second API and the `TaskStatus` class: I'd suggest we
> > consolidate on the existing `TaskMetadata` since we have already
> > accumulated a bunch of such classes, and its better to keep them small as
> > public APIs. You can see
> https://issues.apache.org/jira/browse/KAFKA-12370
> > for a reference and a proposal.
> >
> > On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <re...@gmail.com>
> > wrote:
> >
> > > Thanks for the updates Walker. Some replies and follow-up questions:
> > >
> > > 1. I agree one task could have multiple partitions, but when we hit a
> > delay
> > > in terms of offset progress, do we have a convenient way to reverse
> > mapping
> > > TopicPartition to the problematic task? In production, I believe it
> would
> > > be much quicker to identify the problem using task.id instead of topic
> > > partition, especially when it points to an internal topic. I think
> having
> > > the task id as part of the entry value seems useful, which means
> getting
> > > something like Map<TopicPartition, TaskProgress> where TaskProgress
> > > contains both committed offsets & task id.
> > >
> > > 2. The task idling API was still confusing. I don't think we care about
> > the
> > > exact state when making tasksIdling()query, instead we care more about
> > how
> > > long one task has been in idle state since when you called, which
> > reflects
> > > whether it is a normal idling period. So I feel it might be helpful to
> > > track that time difference and report it in the TaskStatus struct.
> > >
> > > 3. What I want to achieve to have some global mapping of either
> > > TopicPartition or TaskId was that it is not possible for a health check
> > > service to report a task failure that doesn't emit any metrics. So as
> > long
> > > as we have a global topic partition API, health check could always be
> > aware
> > > of any task/partition not reporting its progress, does that make sense?
> > If
> > > you feel we have a better way to achieve this, such as querying all the
> > > input/intermediate topic metadata directly from Kafka for the
> baseline, I
> > > think that should be good as well and worth mentioning it in the KIP.
> > >
> > > Also it seems that the KIP hasn't reflected what you proposed for the
> > task
> > > idling status.
> > >
> > > Best,
> > > Boyang
> > >
> > >
> > > On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <wc...@confluent.io>
> > > wrote:
> > >
> > > > Thank you for the comments everyone!
> > > >
> > > > I think there are a few things I can clear up in general then I will
> > > > specifically respond to each question.
> > > >
> > > > First, when I say "idling" I refer to task idling. Where the stream
> is
> > > > intentionally not making progress. (
> > > > https://issues.apache.org/jira/browse/KAFKA-10091 is an example).
> This
> > > > becomes relevant if a task is waiting on one partition with no data
> but
> > > > that is holding up a partition with data. That would cause one just
> > > looking
> > > > at the committed offset changes to believe the task has a problem
> when
> > it
> > > > is working as intended.
> > > >
> > > > In light of this confusion. I plan to change tasksIdling() to
> > > `Map<TaskId,
> > > > TaskStatus> getTasksStatus()` this should hopefully make it more
> clear
> > > what
> > > > is being exposed.
> > > >
> > > > TaskStatus would include: TopicPartions, TaskId, ProcessorTopology,
> > > Idling,
> > > > and State.
> > > >
> > > > Boyang:
> > > >
> > > > 2) I think that each task should report on whatever TopicPartitions
> > they
> > > > hold, this means a Topic Partition might get reported twice but the
> > user
> > > > can roll those up and use the larger one when looking at the whole
> app.
> > > >
> > > > 4) If the user collects the committed offsets across all the running
> > > > clients there shouldn't be any tasks missing correct?
> > > >
> > > > 6) Because there is not a 1:1 mapping between Tasks and
> > TopicPartitions I
> > > > think it is cleaner to report them separately.
> > > >
> > > > Guozhang:
> > > >
> > > > 1) Yes, that was my original plan but it made more sense to mirror
> how
> > > the
> > > > consumer exposes the committed offset.
> > > >
> > > > 3) That is a good point. I think that we should include internal
> topics
> > > as
> > > > well. I think that if the topology were to evolve there should be
> fair
> > > > warning anyways. Maybe you can clarify what would be limited by
> > exposing
> > > > the interior topics here? I thought a user could find them in other
> > ways.
> > > > If it is the name we could aynomise them before exposing them.
> > > >
> > > > Thank you all for your comments. If I did not respond directly to one
> > of
> > > > your questions I updated the kip to include the details it was
> > > requesting.
> > > > I didn't not include my proposed changes mentioned earlier as I would
> > > like
> > > > to get some feedback about what to include in TaskStatus and in
> > general.
> > > >
> > > > best,
> > > > Walker
> > > >
> > > > On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Walker, thanks for the KIP. A few thoughts:
> > > > >
> > > > > 1) Have you considered just relying on the `KafkaStreams#metrics()`
> > > that
> > > > > includes embedded consumer metrics that have the committed offsets
> > > > > instead of adding a new API? Not advocating that this is a better
> > > > approach
> > > > > but want to make sure we considered all options before we come to
> the
> > > > "last
> > > > > resort" of adding new public interfaces.
> > > > >
> > > > > 2) The javadoc mentions "tasks assigned to this client", but the
> > > returned
> > > > > map is on partitions. I think we should make the javadoc and the
> > return
> > > > > types consistent, either tasks or topic partitions.
> > > > >
> > > > > 3) In addition, if for 2) above we ended up with topic partitions,
> > then
> > > > > would they include only external source topics, or also including
> > > > internal
> > > > > repartition / changelog topics? I think including only external
> > source
> > > > > topic partitions are not sufficient for your goal of tracking
> > progress,
> > > > but
> > > > > exposing internal topic names are also a big commitment here for
> > future
> > > > > topology evolution.
> > > > >
> > > > > 4)  For "tasksIdling", I'm wondering if we can make it more
> general,
> > > that
> > > > > the returned value is not just a boolean, but a TaskState that can
> be
> > > an
> > > > > enum of "created, restoring, running, idle, closing". This could
> help
> > > us
> > > > in
> > > > > the future to track other things like restoration efficiency and
> > > > rebalance
> > > > > efficiency etc.
> > > > >
> > > > > 5) We need to clarify how is "idling" being defined here: e.g. we
> can
> > > > > clearly state that a task is considered idle only if 1) lag is
> > > > > increasing, indicating that there are indeed new records arrived at
> > > > source,
> > > > > while committed offset is not advancing, AND 2) produced offset
> > > (imagine
> > > > we
> > > > > may have punctuations that generate new data to the output topic
> even
> > > if
> > > > > there's no input for a while) is not advancing either.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> > > reluctanthero104@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thanks Walker for the proposed KIP! This should definitely
> empower
> > > > > KStream
> > > > > > users with better visibility.
> > > > > >
> > > > > > Meanwhile I got a couple of questions/suggestions:
> > > > > >
> > > > > >
> > > > > > 1. typo "repost/report" in the motivation section.
> > > > > >
> > > > > > 2. What offsets do we report when the task is under restoration
> or
> > > > > > rebalancing?
> > > > > >
> > > > > > 3. IIUC, we should clearly state that our reported metrics are
> > based
> > > > off
> > > > > > locally assigned tasks for each instance.
> > > > > >
> > > > > > 4. In the meantime, what’s our strategy to report tasks that are
> > not
> > > > > local
> > > > > > to the instance? Users would normally try to monitor all the
> > possible
> > > > > > tasks, and it’s unfortunate we couldn’t determine whether we have
> > > lost
> > > > > > tasks. My brainstorming was whether it makes sense for the leader
> > > > > instance
> > > > > > to report the task progress as -1 for all “supposed to be
> running”
> > > > tasks,
> > > > > > so that on the metrics collector side it could catch any missing
> > > tasks.
> > > > > >
> > > > > > 5. It seems not clear how users should use `isTaskIdling`. Why
> not
> > > > > report a
> > > > > > map/set for idling tasks just as what we did for committed
> offsets?
> > > > > >
> > > > > > 6. Why do we use TopicPartition instead of TaskId as the key in
> the
> > > > > > returned map?
> > > > > > 7. Could we include some details in where we got the commit
> offsets
> > > for
> > > > > > each task? Is it through consumer offset fetch, or the stream
> > > > processing
> > > > > > progress based on the records fetched?
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> > > wcarlson@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Hello all,
> > > > > > >
> > > > > > > I would like to start discussion on KIP-715. This kip aims to
> > make
> > > it
> > > > > > > easier to monitor Kafka Streams progress by exposing the
> > committed
> > > > > offset
> > > > > > > in a similar way as the consumer client does.
> > > > > > >
> > > > > > > Here is the KIP: https://cwiki.apache.org/confluence/x/aRRRCg
> > > > > > >
> > > > > > > Best,
> > > > > > > Walker
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Walker Carlson <wc...@confluent.io>.
Thanks for the follow up Boyang and Guozhang,

I have updated the kip to include these ideas.

Guozhang, that is a good idea about using the TaskMetadata. We can get it
through the ThreadMetadata with a minor change to `localThreadMetadata` in
kafkaStreams. This means that we will only need to update TaskMetadata and
add no other APIs

Boyang, since each TaskMetadata contains the TaskId and TopicPartitions I
don't believe mapping either way will be a problem. Also I think we can do
something like record the time the task started idling and when it stops
idling we can override it to -1. I think that should clear up the first two
points.

As for your third point I am not sure I 100% understand. The ThreadMetadata
will contain a set of all task assigned to that thread. Any health check
service will just need to query all clients and aggregate their responses
to get a complete picture of all tasks correct?

walker

On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wa...@gmail.com> wrote:

> Regarding the second API and the `TaskStatus` class: I'd suggest we
> consolidate on the existing `TaskMetadata` since we have already
> accumulated a bunch of such classes, and its better to keep them small as
> public APIs. You can see https://issues.apache.org/jira/browse/KAFKA-12370
> for a reference and a proposal.
>
> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <re...@gmail.com>
> wrote:
>
> > Thanks for the updates Walker. Some replies and follow-up questions:
> >
> > 1. I agree one task could have multiple partitions, but when we hit a
> delay
> > in terms of offset progress, do we have a convenient way to reverse
> mapping
> > TopicPartition to the problematic task? In production, I believe it would
> > be much quicker to identify the problem using task.id instead of topic
> > partition, especially when it points to an internal topic. I think having
> > the task id as part of the entry value seems useful, which means getting
> > something like Map<TopicPartition, TaskProgress> where TaskProgress
> > contains both committed offsets & task id.
> >
> > 2. The task idling API was still confusing. I don't think we care about
> the
> > exact state when making tasksIdling()query, instead we care more about
> how
> > long one task has been in idle state since when you called, which
> reflects
> > whether it is a normal idling period. So I feel it might be helpful to
> > track that time difference and report it in the TaskStatus struct.
> >
> > 3. What I want to achieve to have some global mapping of either
> > TopicPartition or TaskId was that it is not possible for a health check
> > service to report a task failure that doesn't emit any metrics. So as
> long
> > as we have a global topic partition API, health check could always be
> aware
> > of any task/partition not reporting its progress, does that make sense?
> If
> > you feel we have a better way to achieve this, such as querying all the
> > input/intermediate topic metadata directly from Kafka for the baseline, I
> > think that should be good as well and worth mentioning it in the KIP.
> >
> > Also it seems that the KIP hasn't reflected what you proposed for the
> task
> > idling status.
> >
> > Best,
> > Boyang
> >
> >
> > On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <wc...@confluent.io>
> > wrote:
> >
> > > Thank you for the comments everyone!
> > >
> > > I think there are a few things I can clear up in general then I will
> > > specifically respond to each question.
> > >
> > > First, when I say "idling" I refer to task idling. Where the stream is
> > > intentionally not making progress. (
> > > https://issues.apache.org/jira/browse/KAFKA-10091 is an example). This
> > > becomes relevant if a task is waiting on one partition with no data but
> > > that is holding up a partition with data. That would cause one just
> > looking
> > > at the committed offset changes to believe the task has a problem when
> it
> > > is working as intended.
> > >
> > > In light of this confusion. I plan to change tasksIdling() to
> > `Map<TaskId,
> > > TaskStatus> getTasksStatus()` this should hopefully make it more clear
> > what
> > > is being exposed.
> > >
> > > TaskStatus would include: TopicPartions, TaskId, ProcessorTopology,
> > Idling,
> > > and State.
> > >
> > > Boyang:
> > >
> > > 2) I think that each task should report on whatever TopicPartitions
> they
> > > hold, this means a Topic Partition might get reported twice but the
> user
> > > can roll those up and use the larger one when looking at the whole app.
> > >
> > > 4) If the user collects the committed offsets across all the running
> > > clients there shouldn't be any tasks missing correct?
> > >
> > > 6) Because there is not a 1:1 mapping between Tasks and
> TopicPartitions I
> > > think it is cleaner to report them separately.
> > >
> > > Guozhang:
> > >
> > > 1) Yes, that was my original plan but it made more sense to mirror how
> > the
> > > consumer exposes the committed offset.
> > >
> > > 3) That is a good point. I think that we should include internal topics
> > as
> > > well. I think that if the topology were to evolve there should be fair
> > > warning anyways. Maybe you can clarify what would be limited by
> exposing
> > > the interior topics here? I thought a user could find them in other
> ways.
> > > If it is the name we could aynomise them before exposing them.
> > >
> > > Thank you all for your comments. If I did not respond directly to one
> of
> > > your questions I updated the kip to include the details it was
> > requesting.
> > > I didn't not include my proposed changes mentioned earlier as I would
> > like
> > > to get some feedback about what to include in TaskStatus and in
> general.
> > >
> > > best,
> > > Walker
> > >
> > > On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hello Walker, thanks for the KIP. A few thoughts:
> > > >
> > > > 1) Have you considered just relying on the `KafkaStreams#metrics()`
> > that
> > > > includes embedded consumer metrics that have the committed offsets
> > > > instead of adding a new API? Not advocating that this is a better
> > > approach
> > > > but want to make sure we considered all options before we come to the
> > > "last
> > > > resort" of adding new public interfaces.
> > > >
> > > > 2) The javadoc mentions "tasks assigned to this client", but the
> > returned
> > > > map is on partitions. I think we should make the javadoc and the
> return
> > > > types consistent, either tasks or topic partitions.
> > > >
> > > > 3) In addition, if for 2) above we ended up with topic partitions,
> then
> > > > would they include only external source topics, or also including
> > > internal
> > > > repartition / changelog topics? I think including only external
> source
> > > > topic partitions are not sufficient for your goal of tracking
> progress,
> > > but
> > > > exposing internal topic names are also a big commitment here for
> future
> > > > topology evolution.
> > > >
> > > > 4)  For "tasksIdling", I'm wondering if we can make it more general,
> > that
> > > > the returned value is not just a boolean, but a TaskState that can be
> > an
> > > > enum of "created, restoring, running, idle, closing". This could help
> > us
> > > in
> > > > the future to track other things like restoration efficiency and
> > > rebalance
> > > > efficiency etc.
> > > >
> > > > 5) We need to clarify how is "idling" being defined here: e.g. we can
> > > > clearly state that a task is considered idle only if 1) lag is
> > > > increasing, indicating that there are indeed new records arrived at
> > > source,
> > > > while committed offset is not advancing, AND 2) produced offset
> > (imagine
> > > we
> > > > may have punctuations that generate new data to the output topic even
> > if
> > > > there's no input for a while) is not advancing either.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> > reluctanthero104@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks Walker for the proposed KIP! This should definitely empower
> > > > KStream
> > > > > users with better visibility.
> > > > >
> > > > > Meanwhile I got a couple of questions/suggestions:
> > > > >
> > > > >
> > > > > 1. typo "repost/report" in the motivation section.
> > > > >
> > > > > 2. What offsets do we report when the task is under restoration or
> > > > > rebalancing?
> > > > >
> > > > > 3. IIUC, we should clearly state that our reported metrics are
> based
> > > off
> > > > > locally assigned tasks for each instance.
> > > > >
> > > > > 4. In the meantime, what’s our strategy to report tasks that are
> not
> > > > local
> > > > > to the instance? Users would normally try to monitor all the
> possible
> > > > > tasks, and it’s unfortunate we couldn’t determine whether we have
> > lost
> > > > > tasks. My brainstorming was whether it makes sense for the leader
> > > > instance
> > > > > to report the task progress as -1 for all “supposed to be running”
> > > tasks,
> > > > > so that on the metrics collector side it could catch any missing
> > tasks.
> > > > >
> > > > > 5. It seems not clear how users should use `isTaskIdling`. Why not
> > > > report a
> > > > > map/set for idling tasks just as what we did for committed offsets?
> > > > >
> > > > > 6. Why do we use TopicPartition instead of TaskId as the key in the
> > > > > returned map?
> > > > > 7. Could we include some details in where we got the commit offsets
> > for
> > > > > each task? Is it through consumer offset fetch, or the stream
> > > processing
> > > > > progress based on the records fetched?
> > > > >
> > > > >
> > > > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> > wcarlson@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hello all,
> > > > > >
> > > > > > I would like to start discussion on KIP-715. This kip aims to
> make
> > it
> > > > > > easier to monitor Kafka Streams progress by exposing the
> committed
> > > > offset
> > > > > > in a similar way as the consumer client does.
> > > > > >
> > > > > > Here is the KIP: https://cwiki.apache.org/confluence/x/aRRRCg
> > > > > >
> > > > > > Best,
> > > > > > Walker
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Guozhang Wang <wa...@gmail.com>.
Regarding the second API and the `TaskStatus` class: I'd suggest we
consolidate on the existing `TaskMetadata` since we have already
accumulated a bunch of such classes, and its better to keep them small as
public APIs. You can see https://issues.apache.org/jira/browse/KAFKA-12370
for a reference and a proposal.

On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <re...@gmail.com>
wrote:

> Thanks for the updates Walker. Some replies and follow-up questions:
>
> 1. I agree one task could have multiple partitions, but when we hit a delay
> in terms of offset progress, do we have a convenient way to reverse mapping
> TopicPartition to the problematic task? In production, I believe it would
> be much quicker to identify the problem using task.id instead of topic
> partition, especially when it points to an internal topic. I think having
> the task id as part of the entry value seems useful, which means getting
> something like Map<TopicPartition, TaskProgress> where TaskProgress
> contains both committed offsets & task id.
>
> 2. The task idling API was still confusing. I don't think we care about the
> exact state when making tasksIdling()query, instead we care more about how
> long one task has been in idle state since when you called, which reflects
> whether it is a normal idling period. So I feel it might be helpful to
> track that time difference and report it in the TaskStatus struct.
>
> 3. What I want to achieve to have some global mapping of either
> TopicPartition or TaskId was that it is not possible for a health check
> service to report a task failure that doesn't emit any metrics. So as long
> as we have a global topic partition API, health check could always be aware
> of any task/partition not reporting its progress, does that make sense? If
> you feel we have a better way to achieve this, such as querying all the
> input/intermediate topic metadata directly from Kafka for the baseline, I
> think that should be good as well and worth mentioning it in the KIP.
>
> Also it seems that the KIP hasn't reflected what you proposed for the task
> idling status.
>
> Best,
> Boyang
>
>
> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <wc...@confluent.io>
> wrote:
>
> > Thank you for the comments everyone!
> >
> > I think there are a few things I can clear up in general then I will
> > specifically respond to each question.
> >
> > First, when I say "idling" I refer to task idling. Where the stream is
> > intentionally not making progress. (
> > https://issues.apache.org/jira/browse/KAFKA-10091 is an example). This
> > becomes relevant if a task is waiting on one partition with no data but
> > that is holding up a partition with data. That would cause one just
> looking
> > at the committed offset changes to believe the task has a problem when it
> > is working as intended.
> >
> > In light of this confusion. I plan to change tasksIdling() to
> `Map<TaskId,
> > TaskStatus> getTasksStatus()` this should hopefully make it more clear
> what
> > is being exposed.
> >
> > TaskStatus would include: TopicPartions, TaskId, ProcessorTopology,
> Idling,
> > and State.
> >
> > Boyang:
> >
> > 2) I think that each task should report on whatever TopicPartitions they
> > hold, this means a Topic Partition might get reported twice but the user
> > can roll those up and use the larger one when looking at the whole app.
> >
> > 4) If the user collects the committed offsets across all the running
> > clients there shouldn't be any tasks missing correct?
> >
> > 6) Because there is not a 1:1 mapping between Tasks and TopicPartitions I
> > think it is cleaner to report them separately.
> >
> > Guozhang:
> >
> > 1) Yes, that was my original plan but it made more sense to mirror how
> the
> > consumer exposes the committed offset.
> >
> > 3) That is a good point. I think that we should include internal topics
> as
> > well. I think that if the topology were to evolve there should be fair
> > warning anyways. Maybe you can clarify what would be limited by exposing
> > the interior topics here? I thought a user could find them in other ways.
> > If it is the name we could aynomise them before exposing them.
> >
> > Thank you all for your comments. If I did not respond directly to one of
> > your questions I updated the kip to include the details it was
> requesting.
> > I didn't not include my proposed changes mentioned earlier as I would
> like
> > to get some feedback about what to include in TaskStatus and in general.
> >
> > best,
> > Walker
> >
> > On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Walker, thanks for the KIP. A few thoughts:
> > >
> > > 1) Have you considered just relying on the `KafkaStreams#metrics()`
> that
> > > includes embedded consumer metrics that have the committed offsets
> > > instead of adding a new API? Not advocating that this is a better
> > approach
> > > but want to make sure we considered all options before we come to the
> > "last
> > > resort" of adding new public interfaces.
> > >
> > > 2) The javadoc mentions "tasks assigned to this client", but the
> returned
> > > map is on partitions. I think we should make the javadoc and the return
> > > types consistent, either tasks or topic partitions.
> > >
> > > 3) In addition, if for 2) above we ended up with topic partitions, then
> > > would they include only external source topics, or also including
> > internal
> > > repartition / changelog topics? I think including only external source
> > > topic partitions are not sufficient for your goal of tracking progress,
> > but
> > > exposing internal topic names are also a big commitment here for future
> > > topology evolution.
> > >
> > > 4)  For "tasksIdling", I'm wondering if we can make it more general,
> that
> > > the returned value is not just a boolean, but a TaskState that can be
> an
> > > enum of "created, restoring, running, idle, closing". This could help
> us
> > in
> > > the future to track other things like restoration efficiency and
> > rebalance
> > > efficiency etc.
> > >
> > > 5) We need to clarify how is "idling" being defined here: e.g. we can
> > > clearly state that a task is considered idle only if 1) lag is
> > > increasing, indicating that there are indeed new records arrived at
> > source,
> > > while committed offset is not advancing, AND 2) produced offset
> (imagine
> > we
> > > may have punctuations that generate new data to the output topic even
> if
> > > there's no input for a while) is not advancing either.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> reluctanthero104@gmail.com>
> > > wrote:
> > >
> > > > Thanks Walker for the proposed KIP! This should definitely empower
> > > KStream
> > > > users with better visibility.
> > > >
> > > > Meanwhile I got a couple of questions/suggestions:
> > > >
> > > >
> > > > 1. typo "repost/report" in the motivation section.
> > > >
> > > > 2. What offsets do we report when the task is under restoration or
> > > > rebalancing?
> > > >
> > > > 3. IIUC, we should clearly state that our reported metrics are based
> > off
> > > > locally assigned tasks for each instance.
> > > >
> > > > 4. In the meantime, what’s our strategy to report tasks that are not
> > > local
> > > > to the instance? Users would normally try to monitor all the possible
> > > > tasks, and it’s unfortunate we couldn’t determine whether we have
> lost
> > > > tasks. My brainstorming was whether it makes sense for the leader
> > > instance
> > > > to report the task progress as -1 for all “supposed to be running”
> > tasks,
> > > > so that on the metrics collector side it could catch any missing
> tasks.
> > > >
> > > > 5. It seems not clear how users should use `isTaskIdling`. Why not
> > > report a
> > > > map/set for idling tasks just as what we did for committed offsets?
> > > >
> > > > 6. Why do we use TopicPartition instead of TaskId as the key in the
> > > > returned map?
> > > > 7. Could we include some details in where we got the commit offsets
> for
> > > > each task? Is it through consumer offset fetch, or the stream
> > processing
> > > > progress based on the records fetched?
> > > >
> > > >
> > > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> wcarlson@confluent.io>
> > > > wrote:
> > > >
> > > > > Hello all,
> > > > >
> > > > > I would like to start discussion on KIP-715. This kip aims to make
> it
> > > > > easier to monitor Kafka Streams progress by exposing the committed
> > > offset
> > > > > in a similar way as the consumer client does.
> > > > >
> > > > > Here is the KIP: https://cwiki.apache.org/confluence/x/aRRRCg
> > > > >
> > > > > Best,
> > > > > Walker
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Boyang Chen <re...@gmail.com>.
Thanks for the updates Walker. Some replies and follow-up questions:

1. I agree one task could have multiple partitions, but when we hit a delay
in terms of offset progress, do we have a convenient way to reverse mapping
TopicPartition to the problematic task? In production, I believe it would
be much quicker to identify the problem using task.id instead of topic
partition, especially when it points to an internal topic. I think having
the task id as part of the entry value seems useful, which means getting
something like Map<TopicPartition, TaskProgress> where TaskProgress
contains both committed offsets & task id.

2. The task idling API was still confusing. I don't think we care about the
exact state when making tasksIdling()query, instead we care more about how
long one task has been in idle state since when you called, which reflects
whether it is a normal idling period. So I feel it might be helpful to
track that time difference and report it in the TaskStatus struct.

3. What I want to achieve to have some global mapping of either
TopicPartition or TaskId was that it is not possible for a health check
service to report a task failure that doesn't emit any metrics. So as long
as we have a global topic partition API, health check could always be aware
of any task/partition not reporting its progress, does that make sense? If
you feel we have a better way to achieve this, such as querying all the
input/intermediate topic metadata directly from Kafka for the baseline, I
think that should be good as well and worth mentioning it in the KIP.

Also it seems that the KIP hasn't reflected what you proposed for the task
idling status.

Best,
Boyang


On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <wc...@confluent.io>
wrote:

> Thank you for the comments everyone!
>
> I think there are a few things I can clear up in general then I will
> specifically respond to each question.
>
> First, when I say "idling" I refer to task idling. Where the stream is
> intentionally not making progress. (
> https://issues.apache.org/jira/browse/KAFKA-10091 is an example). This
> becomes relevant if a task is waiting on one partition with no data but
> that is holding up a partition with data. That would cause one just looking
> at the committed offset changes to believe the task has a problem when it
> is working as intended.
>
> In light of this confusion. I plan to change tasksIdling() to `Map<TaskId,
> TaskStatus> getTasksStatus()` this should hopefully make it more clear what
> is being exposed.
>
> TaskStatus would include: TopicPartions, TaskId, ProcessorTopology, Idling,
> and State.
>
> Boyang:
>
> 2) I think that each task should report on whatever TopicPartitions they
> hold, this means a Topic Partition might get reported twice but the user
> can roll those up and use the larger one when looking at the whole app.
>
> 4) If the user collects the committed offsets across all the running
> clients there shouldn't be any tasks missing correct?
>
> 6) Because there is not a 1:1 mapping between Tasks and TopicPartitions I
> think it is cleaner to report them separately.
>
> Guozhang:
>
> 1) Yes, that was my original plan but it made more sense to mirror how the
> consumer exposes the committed offset.
>
> 3) That is a good point. I think that we should include internal topics as
> well. I think that if the topology were to evolve there should be fair
> warning anyways. Maybe you can clarify what would be limited by exposing
> the interior topics here? I thought a user could find them in other ways.
> If it is the name we could aynomise them before exposing them.
>
> Thank you all for your comments. If I did not respond directly to one of
> your questions I updated the kip to include the details it was requesting.
> I didn't not include my proposed changes mentioned earlier as I would like
> to get some feedback about what to include in TaskStatus and in general.
>
> best,
> Walker
>
> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Walker, thanks for the KIP. A few thoughts:
> >
> > 1) Have you considered just relying on the `KafkaStreams#metrics()` that
> > includes embedded consumer metrics that have the committed offsets
> > instead of adding a new API? Not advocating that this is a better
> approach
> > but want to make sure we considered all options before we come to the
> "last
> > resort" of adding new public interfaces.
> >
> > 2) The javadoc mentions "tasks assigned to this client", but the returned
> > map is on partitions. I think we should make the javadoc and the return
> > types consistent, either tasks or topic partitions.
> >
> > 3) In addition, if for 2) above we ended up with topic partitions, then
> > would they include only external source topics, or also including
> internal
> > repartition / changelog topics? I think including only external source
> > topic partitions are not sufficient for your goal of tracking progress,
> but
> > exposing internal topic names are also a big commitment here for future
> > topology evolution.
> >
> > 4)  For "tasksIdling", I'm wondering if we can make it more general, that
> > the returned value is not just a boolean, but a TaskState that can be an
> > enum of "created, restoring, running, idle, closing". This could help us
> in
> > the future to track other things like restoration efficiency and
> rebalance
> > efficiency etc.
> >
> > 5) We need to clarify how is "idling" being defined here: e.g. we can
> > clearly state that a task is considered idle only if 1) lag is
> > increasing, indicating that there are indeed new records arrived at
> source,
> > while committed offset is not advancing, AND 2) produced offset (imagine
> we
> > may have punctuations that generate new data to the output topic even if
> > there's no input for a while) is not advancing either.
> >
> >
> > Guozhang
> >
> >
> >
> > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <re...@gmail.com>
> > wrote:
> >
> > > Thanks Walker for the proposed KIP! This should definitely empower
> > KStream
> > > users with better visibility.
> > >
> > > Meanwhile I got a couple of questions/suggestions:
> > >
> > >
> > > 1. typo "repost/report" in the motivation section.
> > >
> > > 2. What offsets do we report when the task is under restoration or
> > > rebalancing?
> > >
> > > 3. IIUC, we should clearly state that our reported metrics are based
> off
> > > locally assigned tasks for each instance.
> > >
> > > 4. In the meantime, what’s our strategy to report tasks that are not
> > local
> > > to the instance? Users would normally try to monitor all the possible
> > > tasks, and it’s unfortunate we couldn’t determine whether we have lost
> > > tasks. My brainstorming was whether it makes sense for the leader
> > instance
> > > to report the task progress as -1 for all “supposed to be running”
> tasks,
> > > so that on the metrics collector side it could catch any missing tasks.
> > >
> > > 5. It seems not clear how users should use `isTaskIdling`. Why not
> > report a
> > > map/set for idling tasks just as what we did for committed offsets?
> > >
> > > 6. Why do we use TopicPartition instead of TaskId as the key in the
> > > returned map?
> > > 7. Could we include some details in where we got the commit offsets for
> > > each task? Is it through consumer offset fetch, or the stream
> processing
> > > progress based on the records fetched?
> > >
> > >
> > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <wc...@confluent.io>
> > > wrote:
> > >
> > > > Hello all,
> > > >
> > > > I would like to start discussion on KIP-715. This kip aims to make it
> > > > easier to monitor Kafka Streams progress by exposing the committed
> > offset
> > > > in a similar way as the consumer client does.
> > > >
> > > > Here is the KIP: https://cwiki.apache.org/confluence/x/aRRRCg
> > > >
> > > > Best,
> > > > Walker
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Walker Carlson <wc...@confluent.io>.
Thank you for the comments everyone!

I think there are a few things I can clear up in general then I will
specifically respond to each question.

First, when I say "idling" I refer to task idling. Where the stream is
intentionally not making progress. (
https://issues.apache.org/jira/browse/KAFKA-10091 is an example). This
becomes relevant if a task is waiting on one partition with no data but
that is holding up a partition with data. That would cause one just looking
at the committed offset changes to believe the task has a problem when it
is working as intended.

In light of this confusion. I plan to change tasksIdling() to `Map<TaskId,
TaskStatus> getTasksStatus()` this should hopefully make it more clear what
is being exposed.

TaskStatus would include: TopicPartions, TaskId, ProcessorTopology, Idling,
and State.

Boyang:

2) I think that each task should report on whatever TopicPartitions they
hold, this means a Topic Partition might get reported twice but the user
can roll those up and use the larger one when looking at the whole app.

4) If the user collects the committed offsets across all the running
clients there shouldn't be any tasks missing correct?

6) Because there is not a 1:1 mapping between Tasks and TopicPartitions I
think it is cleaner to report them separately.

Guozhang:

1) Yes, that was my original plan but it made more sense to mirror how the
consumer exposes the committed offset.

3) That is a good point. I think that we should include internal topics as
well. I think that if the topology were to evolve there should be fair
warning anyways. Maybe you can clarify what would be limited by exposing
the interior topics here? I thought a user could find them in other ways.
If it is the name we could aynomise them before exposing them.

Thank you all for your comments. If I did not respond directly to one of
your questions I updated the kip to include the details it was requesting.
I didn't not include my proposed changes mentioned earlier as I would like
to get some feedback about what to include in TaskStatus and in general.

best,
Walker

On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Walker, thanks for the KIP. A few thoughts:
>
> 1) Have you considered just relying on the `KafkaStreams#metrics()` that
> includes embedded consumer metrics that have the committed offsets
> instead of adding a new API? Not advocating that this is a better approach
> but want to make sure we considered all options before we come to the "last
> resort" of adding new public interfaces.
>
> 2) The javadoc mentions "tasks assigned to this client", but the returned
> map is on partitions. I think we should make the javadoc and the return
> types consistent, either tasks or topic partitions.
>
> 3) In addition, if for 2) above we ended up with topic partitions, then
> would they include only external source topics, or also including internal
> repartition / changelog topics? I think including only external source
> topic partitions are not sufficient for your goal of tracking progress, but
> exposing internal topic names are also a big commitment here for future
> topology evolution.
>
> 4)  For "tasksIdling", I'm wondering if we can make it more general, that
> the returned value is not just a boolean, but a TaskState that can be an
> enum of "created, restoring, running, idle, closing". This could help us in
> the future to track other things like restoration efficiency and rebalance
> efficiency etc.
>
> 5) We need to clarify how is "idling" being defined here: e.g. we can
> clearly state that a task is considered idle only if 1) lag is
> increasing, indicating that there are indeed new records arrived at source,
> while committed offset is not advancing, AND 2) produced offset (imagine we
> may have punctuations that generate new data to the output topic even if
> there's no input for a while) is not advancing either.
>
>
> Guozhang
>
>
>
> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <re...@gmail.com>
> wrote:
>
> > Thanks Walker for the proposed KIP! This should definitely empower
> KStream
> > users with better visibility.
> >
> > Meanwhile I got a couple of questions/suggestions:
> >
> >
> > 1. typo "repost/report" in the motivation section.
> >
> > 2. What offsets do we report when the task is under restoration or
> > rebalancing?
> >
> > 3. IIUC, we should clearly state that our reported metrics are based off
> > locally assigned tasks for each instance.
> >
> > 4. In the meantime, what’s our strategy to report tasks that are not
> local
> > to the instance? Users would normally try to monitor all the possible
> > tasks, and it’s unfortunate we couldn’t determine whether we have lost
> > tasks. My brainstorming was whether it makes sense for the leader
> instance
> > to report the task progress as -1 for all “supposed to be running” tasks,
> > so that on the metrics collector side it could catch any missing tasks.
> >
> > 5. It seems not clear how users should use `isTaskIdling`. Why not
> report a
> > map/set for idling tasks just as what we did for committed offsets?
> >
> > 6. Why do we use TopicPartition instead of TaskId as the key in the
> > returned map?
> > 7. Could we include some details in where we got the commit offsets for
> > each task? Is it through consumer offset fetch, or the stream processing
> > progress based on the records fetched?
> >
> >
> > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <wc...@confluent.io>
> > wrote:
> >
> > > Hello all,
> > >
> > > I would like to start discussion on KIP-715. This kip aims to make it
> > > easier to monitor Kafka Streams progress by exposing the committed
> offset
> > > in a similar way as the consumer client does.
> > >
> > > Here is the KIP: https://cwiki.apache.org/confluence/x/aRRRCg
> > >
> > > Best,
> > > Walker
> > >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Walker, thanks for the KIP. A few thoughts:

1) Have you considered just relying on the `KafkaStreams#metrics()` that
includes embedded consumer metrics that have the committed offsets
instead of adding a new API? Not advocating that this is a better approach
but want to make sure we considered all options before we come to the "last
resort" of adding new public interfaces.

2) The javadoc mentions "tasks assigned to this client", but the returned
map is on partitions. I think we should make the javadoc and the return
types consistent, either tasks or topic partitions.

3) In addition, if for 2) above we ended up with topic partitions, then
would they include only external source topics, or also including internal
repartition / changelog topics? I think including only external source
topic partitions are not sufficient for your goal of tracking progress, but
exposing internal topic names are also a big commitment here for future
topology evolution.

4)  For "tasksIdling", I'm wondering if we can make it more general, that
the returned value is not just a boolean, but a TaskState that can be an
enum of "created, restoring, running, idle, closing". This could help us in
the future to track other things like restoration efficiency and rebalance
efficiency etc.

5) We need to clarify how is "idling" being defined here: e.g. we can
clearly state that a task is considered idle only if 1) lag is
increasing, indicating that there are indeed new records arrived at source,
while committed offset is not advancing, AND 2) produced offset (imagine we
may have punctuations that generate new data to the output topic even if
there's no input for a while) is not advancing either.


Guozhang



On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <re...@gmail.com>
wrote:

> Thanks Walker for the proposed KIP! This should definitely empower KStream
> users with better visibility.
>
> Meanwhile I got a couple of questions/suggestions:
>
>
> 1. typo "repost/report" in the motivation section.
>
> 2. What offsets do we report when the task is under restoration or
> rebalancing?
>
> 3. IIUC, we should clearly state that our reported metrics are based off
> locally assigned tasks for each instance.
>
> 4. In the meantime, what’s our strategy to report tasks that are not local
> to the instance? Users would normally try to monitor all the possible
> tasks, and it’s unfortunate we couldn’t determine whether we have lost
> tasks. My brainstorming was whether it makes sense for the leader instance
> to report the task progress as -1 for all “supposed to be running” tasks,
> so that on the metrics collector side it could catch any missing tasks.
>
> 5. It seems not clear how users should use `isTaskIdling`. Why not report a
> map/set for idling tasks just as what we did for committed offsets?
>
> 6. Why do we use TopicPartition instead of TaskId as the key in the
> returned map?
> 7. Could we include some details in where we got the commit offsets for
> each task? Is it through consumer offset fetch, or the stream processing
> progress based on the records fetched?
>
>
> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <wc...@confluent.io>
> wrote:
>
> > Hello all,
> >
> > I would like to start discussion on KIP-715. This kip aims to make it
> > easier to monitor Kafka Streams progress by exposing the committed offset
> > in a similar way as the consumer client does.
> >
> > Here is the KIP: https://cwiki.apache.org/confluence/x/aRRRCg
> >
> > Best,
> > Walker
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-715: Expose Committed offset in streams

Posted by Boyang Chen <re...@gmail.com>.
Thanks Walker for the proposed KIP! This should definitely empower KStream
users with better visibility.

Meanwhile I got a couple of questions/suggestions:


1. typo "repost/report" in the motivation section.

2. What offsets do we report when the task is under restoration or
rebalancing?

3. IIUC, we should clearly state that our reported metrics are based off
locally assigned tasks for each instance.

4. In the meantime, what’s our strategy to report tasks that are not local
to the instance? Users would normally try to monitor all the possible
tasks, and it’s unfortunate we couldn’t determine whether we have lost
tasks. My brainstorming was whether it makes sense for the leader instance
to report the task progress as -1 for all “supposed to be running” tasks,
so that on the metrics collector side it could catch any missing tasks.

5. It seems not clear how users should use `isTaskIdling`. Why not report a
map/set for idling tasks just as what we did for committed offsets?

6. Why do we use TopicPartition instead of TaskId as the key in the
returned map?
7. Could we include some details in where we got the commit offsets for
each task? Is it through consumer offset fetch, or the stream processing
progress based on the records fetched?


On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <wc...@confluent.io>
wrote:

> Hello all,
>
> I would like to start discussion on KIP-715. This kip aims to make it
> easier to monitor Kafka Streams progress by exposing the committed offset
> in a similar way as the consumer client does.
>
> Here is the KIP: https://cwiki.apache.org/confluence/x/aRRRCg
>
> Best,
> Walker
>