You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2017/03/25 23:11:02 UTC

Fwd: [DISCUSS] KIP 130: Expose states of active tasks to KafkaStreams public API

Forwarding Matthias's reply to the channel (I think he meant to do that but
reply only to me).

---------- Forwarded message ----------
From: Matthias J. Sax <ma...@confluent.io>
Date: Thu, Mar 23, 2017 at 4:30 PM
Subject: Re: [DISCUSS] KIP 130: Expose states of active tasks to
KafkaStreams public API
To: Guozhang Wang <wa...@gmail.com>


Thanks for the progress on this KIP. I think we are on the right path!

Couple of comments/questions:

(1) Why do we not consider the "rejected alternative" to add the method
to KafkaStreams? The comment on #streamThreads() says:

"Note this method will return <code>null</code> if called on {@link
StreamsMetadata} which represent a remote application."

Thus, if we cannot get any remote metadata, it seems not straight
forward to not add it to KafkaStreams directly -- this would avoid
invalid calls and `null` return value in the first place.

I like the idea about exposing sub-topologies.:

(2a) I would recommend to rename `topicsGroupId` to `subTopologyId` :)

(2b) We could add this to KIP-120 already. However, I would not just
link both via name, but leverage KIP-120 directly, and add a
"Subtopology" member to the TaskMetadata class.


Overall, I like the distinction of KIP-120 only exposing "static"
information that can be determined before the topology get's started,
while this KIP allow to access runtime information.



-Matthias


On 3/22/17 12:42 PM, Guozhang Wang wrote:
> Thanks for the updated KIP, and sorry for the late replies!
>
> I think a little bit more about KIP-130, and I feel that if we are going
> to deprecate the `toString` function (it is not explicitly said in the
> KIP, so I'm not sure if you plan to still keep the
> `KafkaStreams#toString` as is or are going to replace it with the
> proposed APIs) with the proposed ones, it may be okay. More
> specifically, after both KIP-120 and KIP-130:
>
> 1. users can use `#describe` function to check the generated topology
> before calling `KafkaStreams#start`, which is static information.
> 2. users can use the `StreamsMetadata -> ThreadMetadata -> TaskMetadata`
> programmatically after called `KafkaStreams#start` to get the
> dynamically changeable information.
>
> One thing I'm still not sure though, is that in `TaskMetadata` we only
> have the TaskId and assigned partitions, whereas in
> "TopologyDescription" introduced in KIP-120, it will simply describe the
> whole topology possibly composed of multiple sub-topologies. So it is
> hard for users to tell which sub-topology is executed under which task
> on-the-fly.
>
> Hence I'm thinking if we can expose the "sub-topology-id" (named as
> topicsGroupId internally) in TopologyDescription#Subtopology, and then
> from the task id which is essentially "sub-topology-id DASH
> partition-group-id" users can make the link, though it is still not that
> straight-forward.
>
> Thoughts?
>
> Guozhang
>
>
>
> On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois
> <fhussonnois@gmail.com <ma...@gmail.com>> wrote:
>
>     Thanks Guozhang for pointing me to the KIP-120.
>
>     I've made some modifications to the KIP. I also proposed a new PR
>     (there is
>     still some tests to make).
>     https://cwiki.apache.org/confluence/display/KAFKA/KIP+
130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>     <https://cwiki.apache.org/confluence/display/KAFKA/KIP+
130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API>
>
>     Exposing consumed offsets through JMX is sufficient for debugging
>     purpose.
>     But I think this could be part to another JIRA as there is no impact
to
>     public API.
>
>     Thanks
>
>     2017-03-10 22:35 GMT+01:00 Guozhang Wang <wangguoz@gmail.com
>     <ma...@gmail.com>>:
>
>     > Hello Florian,
>     >
>     > As for programmatically discover monitoring data by piping metrics
>     into a
>     > dedicated topic. I think you can actually use a
>     KafkaMetricsReporter which
>     > pipes the polled metric values into a pre-defined topic (note that
>     in Kafka
>     > the MetricsReporter is simply an interface and users can build
>     their own
>     > impl in addition to the JMXReporter), for example :
>     >
>     > https://github.com/krux/kafka-metrics-reporter
>     <https://github.com/krux/kafka-metrics-reporter>
>     >
>     > As for the "static task-level assignment", what I meant is that
>     the mapping
>     > from source-topic-partitions -> tasks are static, via the
>     > "PartitionGrouper", and a task won't switch from an active task to a
>     > standby task, it is actually that an active task could be
>     migrated, as a
>     > whole along with all its assigned partitions, to another thread /
>     process
>     > and a new standby task will be created on the host that this
>     active task is
>     > migrating from. So for the SAME task, its taskMetadata.
>     > assignedPartitions()
>     > will always return you the same partitions.
>     >
>     > As for the `toString` function that what we have today, I feel it
>     has some
>     > correlations with KIP-120 so I'm trying to coordinate some
>     discussions here
>     > (cc'ing Matthias as the owner of KIP-120). My understand is that:
>     >
>     > 1. In KIP-120, the `toString` function of `KafkaStreams` will be
>     removed
>     > and instead the `Topology#describe` function will be introduced
>     for users
>     > to debug the topology BEFORE start running their instance with the
>     > topology. And hence the description won't contain any task
>     information as
>     > they are not formed yet.
>     > 2. In KIP-130, we want to add the task-level information for
>     monitoring
>     > purposes, which is not static and can only be captured AFTER the
>     instance
>     > has started running. Again I'm wondering for KIP-130 alone if
>     adding those
>     > metrics mentioned in my previous email would suffice even for the
>     use case
>     > that you have mentioned.
>     >
>     >
>     > Guozhang
>     >
>     > On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois
>     <fhussonnois@gmail.com <ma...@gmail.com>>
>     > wrote:
>     >
>     > > Hi Guozhang
>     > >
>     > > Thank you for your feedback. I've started to look more deeply
>     into the
>     > > code. As you mention, it would be more clever to use the current
>     > > StreamMetadata API to expose these information.
>     > >
>     > > I think exposing metrics through JMX is great for building
>     monitoring
>     > > dashboards using some tools like jmxtrans and grafana.
>     > > But for our use case we would like to expose the states
>     directely from
>     > the
>     > > application embedding the kstreams topologies.
>     > > So we expect to be able to retrieve states in a programmatic way.
>     > >
>     > > For instance, we could imagin to produce those states into a
>     dedicated
>     > > topic. In that way a third application could automatically
>     discover all
>     > > kafka-streams applications which could be monitored.
>     > > In production environment, that can be clearly a solution to have
a
>     > > complete overview of a microservices architecture based on Kafka
>     Streams.
>     > >
>     > > The toString() method give a lots of information it can only be
>     used for
>     > > debugging purpose but not to build a topologies visualization
>     tool. We
>     > > could actually expose same details about the stream topology
>     from the
>     > > StreamMetadata API ? So the TaskMetadata class you have
>     suggested could
>     > > contains similar information that ones return by the toString
>     method from
>     > > AbstractTask class ?
>     > >
>     > > I can update the KIP in that way.
>     > >
>     > > Finally,  I'm not sure to understand your last point :* "Note
>     that the
>     > > task-level assignment information is static, i.e. it will not
change
>     > during
>     > > the runtime" *
>     > >
>     > > Does that mean when a rebalance occurs new tasks are created for
>     the new
>     > > assignments and old ones just switch to a standby state ?
>     > >
>     > > Thanks,
>     > >
>     > > 2017-03-05 7:04 GMT+01:00 Guozhang Wang <wangguoz@gmail.com
>     <ma...@gmail.com>>:
>     > >
>     > > > Hello Florian,
>     > > >
>     > > > Thanks for the KIP and your detailed explanation of your use
>     case. I
>     > > think
>     > > > there are two dimensions to discuss on how to improve Streams'
>     > > > debuggability (or more specifically state exposure for
>     visualization).
>     > > >
>     > > > First question is "what information should we expose to the
>     user". From
>     > > > your KIP I saw generally three categories:
>     > > >
>     > > > 1. The state of the thread within a process, as you mentioned
>     currently
>     > > we
>     > > > only expose the state of the process but not the finer grained
>     > per-thread
>     > > > state.
>     > > > 2. The state of the task. Currently the most close API to this
is
>     > > > StreamsMetadata,
>     > > > however it aggregates the tasks across all threads and only
>     present the
>     > > > aggregated set of the assigned partitions / state stores etc.
>     We can
>     > > > consider extending this method to have a new
>     StreamsMetadata#tasks()
>     > > which
>     > > > returns a TaskMetadata with the similar fields, and the
>     > > > StreamsMetadata.stateStoreNames / etc would still be returning
the
>     > > > aggregated results but users can still "drill down" if they
want.
>     > > >
>     > > > The second question is "how should we expose them to the
>     user". For
>     > > > example, you mentioned about consumedOffsetsByPartition in the
>     > > activeTasks.
>     > > > We could add this as a JMX metric based on fetch positions
>     inside the
>     > > > consumer layer (note that Streams is just embedding consumers)
>     or we
>     > > could
>     > > > consider adding it into TaskMetadata. Either case it can be
>     visualized
>     > > for
>     > > > monitoring. The reason we expose StreamsMetadata as well as
>     State was
>     > > that
>     > > > it is expected to be "polled" in a programmatic way for
>     interactive
>     > > queries
>     > > > and also for control flows (e.g. I would like to ONLY start
>     running my
>     > > > other topology until the first topology has been up and
>     running) while
>     > > for
>     > > > your case it seems the main purpose is to continuously query
>     them for
>     > > > monitoring etc. Personally I'd prefer to expose them as JMX
>     only for
>     > such
>     > > > purposes only to have a simpler API.
>     > > >
>     > > > So given your current motivations I'd suggest expose the
following
>     > > > information as newly added metrics in Streams:
>     > > >
>     > > > 1. Thread-level state metric.
>     > > > 2. Task-level hosted client identifier metric (e.g. host:port).
>     > > > 3. Consumer-level per-topic/partition position metric (
>     > > > https://kafka.apache.org/documentation/#topic_fetch_monitoring
>     <https://kafka.apache.org/documentation/#topic_fetch_monitoring>).
>     > > >
>     > > > Note that the task-level assignment information is static,
>     i.e. it will
>     > > not
>     > > > change during the runtime at all and can be accessed from the
>     > > `toString()`
>     > > > function already even before the instance start running, so I
>     think
>     > this
>     > > > piece of information do not need to be exposed through JMX
>     anymore.
>     > > >
>     > > > WDYT?
>     > > >
>     > > > Guozhang
>     > > >
>     > > >
>     > > > On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy
>     <damian.guy@gmail.com <ma...@gmail.com>>
>     > wrote:
>     > > >
>     > > > > Hi Florian,
>     > > > >
>     > > > > Thanks for the KIP.
>     > > > >
>     > > > > It seems there is some overlap here with what we already have
in
>     > > > > KafkaStreams.allMetadata(). This currently returns a
>     > > > > Collection<StreamsMetadata> where each StreamsMetadata
>     instance holds
>     > > the
>     > > > > state stores and partition assignment for every instance of
the
>     > > > > KafkaStreams application. I'm wondering if that is good
>     enough for
>     > what
>     > > > you
>     > > > > are trying to achieve? If not could it be modified to
>     include the per
>     > > > > Thread assignment?
>     > > > >
>     > > > > Thanks,
>     > > > > Damian
>     > > > >
>     > > > >
>     > > > >
>     > > > >
>     > > > >
>     > > > >
>     > > > > On Wed, 1 Mar 2017 at 22:49 Florian Hussonnois <
>     > fhussonnois@gmail.com <ma...@gmail.com>>
>     > > > > wrote:
>     > > > >
>     > > > > > Hi Matthias,
>     > > > > >
>     > > > > > First, I will answer to your last question.
>     > > > > >
>     > > > > > The main reason to have both TaskState#assignment and
>     > > > > > TaskState#consumedOffsetsByPartition is that tasks have no
>     > consumed
>     > > > > offsets
>     > > > > > until at least one message is consumed for each partition
>     even if
>     > > > > previous
>     > > > > > offsets exist for the consumer group.
>     > > > > > So yes this methods are redundant as it only diverge at
>     application
>     > > > > > startup.
>     > > > > >
>     > > > > > About the use case, currently we are developping for a
>     customer a
>     > > > little
>     > > > > > framework based on KafkaStreams which
>     transform/denormalize data
>     > > before
>     > > > > > ingesting into hadoop.
>     > > > > >
>     > > > > > We have a cluster of workers (SpringBoot) which instantiate
>     > KStreams
>     > > > > > topologies dynamicaly based on dataflow configurations.
>     > > > > > Each configuration describes a topic to consume and how to
>     process
>     > > > > messages
>     > > > > > (this looks like NiFi processors API).
>     > > > > >
>     > > > > > Our architecture is inspired from KafkaConnect. We have
>     topics for
>     > > > > configs
>     > > > > > and states which are consumed by each workers (actually we
>     have
>     > > reused
>     > > > > some
>     > > > > > internals classes to the connect API).
>     > > > > >
>     > > > > > Now, we would like to develop UIs to visualize topics and
>     > partitions
>     > > > > > consumed by our worker applications.
>     > > > > >
>     > > > > > Also, I think it would be nice to be able,  in the futur, to
>     > develop
>     > > > web
>     > > > > > UIs similar to Spark but for KafkaStreams to visualize
>     DAGs...so
>     > > maybe
>     > > > > this
>     > > > > > KIP is just a first step.
>     > > > > >
>     > > > > > Thanks,
>     > > > > >
>     > > > > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax
>     <matthias@confluent.io <ma...@confluent.io>
>     > >:
>     > > > > >
>     > > > > > > Thanks for the KIP.
>     > > > > > >
>     > > > > > > I am wondering a little bit, why you need to expose this
>     > > information.
>     > > > > > > Can you describe some use cases?
>     > > > > > >
>     > > > > > > Would it be worth to unify this new API with
>     KafkaStreams#state()
>     > > to
>     > > > > get
>     > > > > > > the overall state of an application without the need to
>     call two
>     > > > > > > different methods? Not sure how this unified API might
>     look like
>     > > > > though.
>     > > > > > >
>     > > > > > >
>     > > > > > > One minor comment about the API: TaskState#assignment
>     seems to be
>     > > > > > > redundant. It should be the same as
>     > > > > > > TaskState#consumedOffsetsByPartition.keySet()
>     > > > > > >
>     > > > > > > Or do I miss something?
>     > > > > > >
>     > > > > > >
>     > > > > > > -Matthias
>     > > > > > >
>     > > > > > > On 3/1/17 5:19 AM, Florian Hussonnois wrote:
>     > > > > > > > Hi Eno,
>     > > > > > > >
>     > > > > > > > Yes, but the state() method only returns the global
>     state of
>     > the
>     > > > > > > > KafkaStream application (ie: CREATED, RUNNING,
>     REBALANCING,
>     > > > > > > > PENDING_SHUTDOWN, NOT_RUNNING).
>     > > > > > > >
>     > > > > > > > An alternative to this KIP would be to change this
>     method to
>     > > return
>     > > > > > more
>     > > > > > > > information instead of adding a new method.
>     > > > > > > >
>     > > > > > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska <
>     > eno.thereska@gmail.com <ma...@gmail.com>
>     > > >:
>     > > > > > > >
>     > > > > > > >> Thanks Florian,
>     > > > > > > >>
>     > > > > > > >> Have you had a chance to look at the new state methods
in
>     > > 0.10.2,
>     > > > > > e.g.,
>     > > > > > > >> KafkaStreams.state()?
>     > > > > > > >>
>     > > > > > > >> Thanks
>     > > > > > > >> Eno
>     > > > > > > >>> On 1 Mar 2017, at 11:54, Florian Hussonnois <
>     > > > fhussonnois@gmail.com <ma...@gmail.com>
>     > > > > >
>     > > > > > > >> wrote:
>     > > > > > > >>>
>     > > > > > > >>> Hi all,
>     > > > > > > >>>
>     > > > > > > >>> I have just created KIP-130 to add a new method to the
>     > > > KafkaStreams
>     > > > > > API
>     > > > > > > >> in
>     > > > > > > >>> order to expose the states of threads and active
tasks.
>     > > > > > > >>>
>     > > > > > > >>>
>     https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>     <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
>     > > > > > > >> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+
>     > public+API
>     > > > > > > >>>
>     > > > > > > >>>
>     > > > > > > >>> Thanks,
>     > > > > > > >>>
>     > > > > > > >>> --
>     > > > > > > >>> Florian HUSSONNOIS
>     > > > > > > >>
>     > > > > > > >>
>     > > > > > > >
>     > > > > > > >
>     > > > > > >
>     > > > > > >
>     > > > > >
>     > > > > >
>     > > > > > --
>     > > > > > Florian HUSSONNOIS
>     > > > > >
>     > > > >
>     > > >
>     > > >
>     > > >
>     > > > --
>     > > > -- Guozhang
>     > > >
>     > >
>     > >
>     > >
>     > > --
>     > > Florian HUSSONNOIS
>     > >
>     >
>     >
>     >
>     > --
>     > -- Guozhang
>     >
>
>
>
>     --
>     Florian HUSSONNOIS
>
>
>
>
> --
> -- Guozhang




-- 
-- Guozhang