You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jorge Esteban Quilcate Otoya <qu...@gmail.com> on 2018/09/15 17:31:14 UTC

Accessing Topology Builder

Hi everyone,

I'm experimenting on how to add tracing to Kafka Streams.

One option is to override and access
`InternalTopologyBuilder#addProcessor`. Currently this method it is final,
and builder is not exposed as part of `StreamsBuilder`:

```
public class StreamsBuilder {

    /** The actual topology that is constructed by this StreamsBuilder. */
    private final Topology topology = new Topology();

    /** The topology's internal builder. */
    final InternalTopologyBuilder internalTopologyBuilder =
topology.internalTopologyBuilder;

    private final InternalStreamsBuilder internalStreamsBuilder = new
InternalStreamsBuilder(internalTopologyBuilder);
```

The goal is that If `builder#addProcessor` is exposed, we could decorate
every `ProcessorSupplier` and capture traces from it:

```
@Override
  public void addProcessor(String name, ProcessorSupplier supplier,
String... predecessorNames) {
    super.addProcessor(name, new TracingProcessorSupplier(tracer, name,
supplier), predecessorNames);
  }
```

Would it make sense to propose this as a change:
https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology ? or
maybe there is a better way to do this?
TopologyWrapper does something similar:
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java

Thanks in advance for any help.

Cheers,
Jorge.

Re: Accessing Topology Builder

Posted by John Roesler <jo...@confluent.io>.
Hi Jorge,

Thanks for the clarifications.

Yes, I'm also not sure what "built-in tracing" would look like, and it may
not be a good idea. FWIW, though, I was not thinking of something like
"rich functions". Rather, I was imagining that Streams could just always
record spans in headers as it processes the data, no need for the
interceptor.
However, it would be pretty complicated if you only wanted to record spans
for certain records, or a certain percentage of records. This is where a
tracing interceptor would look more attractive.

On the actual interceptor, offhand, I suppose two main options are
available:
1. define interceptor interfaces with hooks into the lifecycle, like you
gave. (before/after process might work, but in general, it might be more
like beforeProcess, beforeForward)
2. just let the interceptor implement the same Processor interface, but
additionally have access to the "delegate" processor somehow?

What approach did you take in your TracingProcessor?

Thanks,
-John

On Tue, Sep 18, 2018 at 10:02 AM Jorge Esteban Quilcate Otoya <
quilcate.jorge@gmail.com> wrote:

> final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
> Guozhang and John.
>
> @Guozhang:
>
> > I'd suggest to provide a
> > WrapperProcessorSupplier for the users than modifying
> > InternalStreamsTopology: more specifically, you can provide an
> > `abstract WrapperProcessorSupplier
> > implements ProcessorSupplier` and then let users to instantiate this
> class
> > instead of the "bare-metal" interface. WDYT?
>
> Yes, in the gist, I have a class implementing `ProcessorSupplier`:
>
> ```
> public class TracingProcessorSupplier<K, V> implements ProcessorSupplier<K,
> V> {
>   final KafkaTracing kafkaTracing;
>   final String name;
>   final ProcessorSupplier<K, V> delegate;
>    public TracingProcessorSupplier(KafkaTracing kafkaTracing,
>       String name, ProcessorSupplier<K, V> delegate) {
>     this.kafkaTracing = kafkaTracing;
>     this.name = name;
>     this.delegate = delegate;
>   }
>    @Override public Processor<K, V> get() {
>     return new TracingProcessor<>(kafkaTracing, name, delegate.get());
>   }
> }
> ```
>
> My challenge is how to wrap Topology Processors created by
> `StreamsBuilder#build` to make this instrumentation easy to adopt by Kafka
> Streams users.
>
> @John:
>
> > The diff you posted only contains the library-side changes, and it's not
> > obvious how you would use this to insert the desired tracing code.
> > Perhaps you could provide a snippet demonstrating how you want to use
> this
> > change to enable tracing?
>
> My first approach was something like this:
>
> ```
> final StreamsBuilder builder = kafkaStreamsTracing.builder();
> ```
>
> Where `KafkaStreamsTracing#builder` looks like this:
>
> ```
>   public StreamsBuilder builder() {
>     return new StreamsBuilder(new Topology(new
> TracingInternalTopologyBuilder(kafkaTracing)));
>   }
> ```
>
> Then, once the builder creates a topology, `processors` will be wrapped by
> `TracingProcessorSupplier` described above.
>
> Probably this approach is too naive but works as an initial proof of
> concept.
>
> > Off the top of my head, here are some other approaches you might
> evaluate:
> > * you mentioned interceptors. Perhaps we could create a
> > ProcessorInterceptor interface and add a config to set it.
>
> This sounds very interesting to me. Then we won't need to touch internal
> API's, and just provide some configs. One challenge here is how to define
> the hooks. In consumer/producer, lifecycle is clear, `onConsumer`/`onSend`
> and then `onCommit`/`onAck` methods. For Stream processors, how this will
> look like? Maybe `beforeProcess(context, key, value)` and
> `afterProcess(context, key, value)`.
>
> > * perhaps we could simply build the tracing headers into Streams. Is
> there
> > a benefit to making it customizable?
>
> I don't understand this option completely. Do you mean something like
> KIP-159 (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams
> )?
> Headers available on StreamsDSL will allow users to create "custom" traces,
> for instance:
>
> ```
> stream.map( (headers, k, v) -> {
>   Span span = kafkaTracing.nextSpan(headers).start();
>   doSomething(k, v);
>   span.finish();
> }
> ```
>
> but it won't be possible to instrument the existing processors exposed by
> DSL only by enabling headers on Streams DSL.
>
> If we can define a way to pass a `ProcessorSupplier` to be used by
> `StreamsBuilder#internalTopology` -not sure if via constructor or some
> other way- would be enough to support this use-case.
>
> > Also, as Matthias said, you would need to create a KIP to propose this
> > change, but of course we can continue this preliminary discussion until
> you
> > feel confident to create the KIP.
>
> Happy to do it once the approach is clearer.
>
> Cheers,
> Jorge.
>
> El lun., 17 sept. 2018 a las 17:09, John Roesler (<jo...@confluent.io>)
> escribió:
>
> > If I understand the request, it's about tracking the latencies for a
> > specific record, not the aggregated latencies for each processor.
> >
> > Jorge,
> >
> > The diff you posted only contains the library-side changes, and it's not
> > obvious how you would use this to insert the desired tracing code.
> > Perhaps you could provide a snippet demonstrating how you want to use
> this
> > change to enable tracing?
> >
> > Also, as Matthias said, you would need to create a KIP to propose this
> > change, but of course we can continue this preliminary discussion until
> you
> > feel confident to create the KIP.
> >
> > Off the top of my head, here are some other approaches you might
> evaluate:
> > * you mentioned interceptors. Perhaps we could create a
> > ProcessorInterceptor interface and add a config to set it.
> > * perhaps we could simply build the tracing headers into Streams. Is
> there
> > a benefit to making it customizable?
> >
> > Thanks for considering this problem!
> > -John
> >
> > On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Jorge,
> > >
> > > From the TracingProcessor implementation it seems you want to track
> > > per-processor processing latency, is that right? If this is the case
> you
> > > can actually use the per-processor metrics which include latency
> sensors.
> > >
> > > If you do want to track, for a certain record, what's the latency of
> > > processing it, then you'd probably need the processor implementation in
> > > your repo. In this case, though, I'd suggest to provide a
> > > WrapperProcessorSupplier for the users than modifying
> > > InternalStreamsTopology: more specifically, you can provide an
> > > `abstract WrapperProcessorSupplier
> > > implements ProcessorSupplier` and then let users to instantiate this
> > class
> > > instead of the "bare-metal" interface. WDYT?
> > >
> > >
> > > Guozhang
> > >
> > > On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
> > > quilcate.jorge@gmail.com> wrote:
> > >
> > > > Thanks for your answer, Matthias!
> > > >
> > > > What I'm looking for is something similar to interceptors, but for
> > Stream
> > > > Processors.
> > > >
> > > > In Zipkin -and probably other tracing implementations as well- we are
> > > using
> > > > Headers to propagate the context of a trace (i.e. adding metadata to
> > the
> > > > Kafka Record, so we can create references to a trace).
> > > > Now that Headers are part of Kafka Streams Processor API, we can
> > > propagate
> > > > context from input (Consumers) to outputs (Producers) by using
> > > > `KafkaClientSupplier` (e.g. <
> > > > https://github.com/openzipkin/brave/blob/master/
> > > > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> > > > TracingKafkaClientSupplier.java
> > > > >).
> > > >
> > > > "Input to Output" traces could be enough for some use-cases, but we
> are
> > > > looking for a more detailed trace -that could cover cases like
> > > side-effects
> > > > (e.g. for each processor), where input/output and processors
> latencies
> > > can
> > > > be recorded. This is why I have been looking for how to decorate the
> > > > `ProcessorSupplier` and all the changes shown in the comparison. Here
> > is
> > > a
> > > > gist of how we are planning to decorate the `addProcessor` method:
> > > > https://github.com/openzipkin/brave/compare/master...jeqo:
> > > > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
> > > >
> > > > Hope this makes a bit more sense now :)
> > > >
> > > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> > > > matthias@confluent.io>)
> > > > escribió:
> > > >
> > > > > >> I'm experimenting on how to add tracing to Kafka Streams.
> > > > >
> > > > > What do you mean by this exactly? Is there a JIRA? I am fine
> removing
> > > > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an
> > > > > internal class.
> > > > >
> > > > > However, the diff also shows
> > > > >
> > > > > > public Topology(final InternalTopologyBuilder
> > > internalTopologyBuilder)
> > > > {
> > > > >
> > > > > This has two impacts: first, it modifies `Topology` what is part of
> > > > > public API and would require a KIP. Second, it exposes
> > > > > `InternalTopologyBuilder` as part of the public API -- something we
> > > > > should not do.
> > > > >
> > > > > I am also not sure, why you want to do this (btw: also public API
> > > change
> > > > > requiring a KIP). However, this should not be necessary.
> > > > >
> > > > > >     public StreamsBuilder(final Topology topology)  {
> > > > >
> > > > >
> > > > > I think I am lacking some context what you try to achieve. Maybe
> you
> > > can
> > > > > elaborate in the problem you try to solve?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > > > Hi everyone,
> > > > > >
> > > > > > I'm experimenting on how to add tracing to Kafka Streams.
> > > > > >
> > > > > > One option is to override and access
> > > > > > `InternalTopologyBuilder#addProcessor`. Currently this method it
> is
> > > > > final,
> > > > > > and builder is not exposed as part of `StreamsBuilder`:
> > > > > >
> > > > > > ```
> > > > > > public class StreamsBuilder {
> > > > > >
> > > > > >     /** The actual topology that is constructed by this
> > > StreamsBuilder.
> > > > > */
> > > > > >     private final Topology topology = new Topology();
> > > > > >
> > > > > >     /** The topology's internal builder. */
> > > > > >     final InternalTopologyBuilder internalTopologyBuilder =
> > > > > > topology.internalTopologyBuilder;
> > > > > >
> > > > > >     private final InternalStreamsBuilder internalStreamsBuilder =
> > new
> > > > > > InternalStreamsBuilder(internalTopologyBuilder);
> > > > > > ```
> > > > > >
> > > > > > The goal is that If `builder#addProcessor` is exposed, we could
> > > > decorate
> > > > > > every `ProcessorSupplier` and capture traces from it:
> > > > > >
> > > > > > ```
> > > > > > @Override
> > > > > >   public void addProcessor(String name, ProcessorSupplier
> supplier,
> > > > > > String... predecessorNames) {
> > > > > >     super.addProcessor(name, new TracingProcessorSupplier(tracer,
> > > > name,
> > > > > > supplier), predecessorNames);
> > > > > >   }
> > > > > > ```
> > > > > >
> > > > > > Would it make sense to propose this as a change:
> > > > > >
> > > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> > > > ?
> > > > > or
> > > > > > maybe there is a better way to do this?
> > > > > > TopologyWrapper does something similar:
> > > > > >
> > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > test/java/org/apache/kafka/streams/TopologyWrapper.java
> > > > > >
> > > > > > Thanks in advance for any help.
> > > > > >
> > > > > > Cheers,
> > > > > > Jorge.
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: Accessing Topology Builder

Posted by "Matthias J. Sax" <ma...@confluent.io>.
For most operators yes. For sure, for all stateless operators like
`mapValues()` -- for stateful operators, it depends if data
repartitioning is required. If yes, the topology would be split into two
sub-topologies and thus, both `peek()` operations could run on different
threads.

If you want to double check, you can describe a topology before
executing via `Topology.describe()` -- all operators of a single
sub-topology will be executed single threaded.


-Matthias


On 9/26/18 12:14 AM, Jorge Esteban Quilcate Otoya wrote:
> Good to know, thanks Matthias!
> 
> You've mentioned a previous operator, but what about:
> `peek().mapValues().peek()`, will both `peek`s be in the same thread as
> well?
> 
> El mar., 25 sept. 2018 a las 23:14, Matthias J. Sax (<ma...@confluent.io>)
> escribió:
> 
>> Just for clarification:
>>
>> `peek()` would run on the same thread and the previous operator. Even
>> if---strictly speaking---there is no public contract to guarantee this,
>> it would be the case in the current implementation, and I also don't see
>> any reason why this would change at any point in the future, because
>> it's the most efficient implementation I can think of.
>>
>> -Matthias
>>
>> On 9/22/18 4:51 AM, Jorge Esteban Quilcate Otoya wrote:
>>> Thanks, everyone!
>>>
>>> @Bill, the main issue with using `KStraem#peek()` is that AFAIK each
>> `peek`
>>> processor runs on a potentially different thread, then passing the trace
>>> between them could be challenging. It will also require users to add
>> these
>>> operators themselves, which could be too cumbersome to use.
>>>
>>> @Guozhang and @John: I will first focus on creating the
>>> `TracingProcessorSupplier` for instrumenting custom `Processors` and I
>> will
>>> keep the idea of a `ProcessorInterceptor` in the back of my head to see
>> if
>>> it make sense to propose a KIP for this.
>>>
>>> Thanks again for your feedback!
>>>
>>> Cheers,
>>> Jorge.
>>> El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bb...@gmail.com>)
>>> escribió:
>>>
>>>> Jorge:
>>>>
>>>> I have a crazy idea off the top of my head.
>>>>
>>>> Would something as low-tech using KSteam.peek calls on either side of
>>>> certain processors to record start and end times work?
>>>>
>>>> Thanks,
>>>> Bill
>>>>
>>>> On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wa...@gmail.com>
>> wrote:
>>>>
>>>>> Jorge:
>>>>>
>>>>> My suggestion was to let your users to implement on the
>>>>> TracingProcessorSupplier
>>>>> / TracingProcessor directly instead of the base-line ProcessorSupplier
>> /
>>>>> Processor. Would that work for you?
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya <
>>>>> quilcate.jorge@gmail.com> wrote:
>>>>>
>>>>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
>>>>>> Guozhang and John.
>>>>>>
>>>>>> @Guozhang:
>>>>>>
>>>>>>> I'd suggest to provide a
>>>>>>> WrapperProcessorSupplier for the users than modifying
>>>>>>> InternalStreamsTopology: more specifically, you can provide an
>>>>>>> `abstract WrapperProcessorSupplier
>>>>>>> implements ProcessorSupplier` and then let users to instantiate this
>>>>>> class
>>>>>>> instead of the "bare-metal" interface. WDYT?
>>>>>>
>>>>>> Yes, in the gist, I have a class implementing `ProcessorSupplier`:
>>>>>>
>>>>>> ```
>>>>>> public class TracingProcessorSupplier<K, V> implements
>>>>> ProcessorSupplier<K,
>>>>>> V> {
>>>>>>   final KafkaTracing kafkaTracing;
>>>>>>   final String name;
>>>>>>   final ProcessorSupplier<K, V> delegate;
>>>>>>    public TracingProcessorSupplier(KafkaTracing kafkaTracing,
>>>>>>       String name, ProcessorSupplier<K, V> delegate) {
>>>>>>     this.kafkaTracing = kafkaTracing;
>>>>>>     this.name = name;
>>>>>>     this.delegate = delegate;
>>>>>>   }
>>>>>>    @Override public Processor<K, V> get() {
>>>>>>     return new TracingProcessor<>(kafkaTracing, name, delegate.get());
>>>>>>   }
>>>>>> }
>>>>>> ```
>>>>>>
>>>>>> My challenge is how to wrap Topology Processors created by
>>>>>> `StreamsBuilder#build` to make this instrumentation easy to adopt by
>>>>> Kafka
>>>>>> Streams users.
>>>>>>
>>>>>> @John:
>>>>>>
>>>>>>> The diff you posted only contains the library-side changes, and it's
>>>>> not
>>>>>>> obvious how you would use this to insert the desired tracing code.
>>>>>>> Perhaps you could provide a snippet demonstrating how you want to use
>>>>>> this
>>>>>>> change to enable tracing?
>>>>>>
>>>>>> My first approach was something like this:
>>>>>>
>>>>>> ```
>>>>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();
>>>>>> ```
>>>>>>
>>>>>> Where `KafkaStreamsTracing#builder` looks like this:
>>>>>>
>>>>>> ```
>>>>>>   public StreamsBuilder builder() {
>>>>>>     return new StreamsBuilder(new Topology(new
>>>>>> TracingInternalTopologyBuilder(kafkaTracing)));
>>>>>>   }
>>>>>> ```
>>>>>>
>>>>>> Then, once the builder creates a topology, `processors` will be
>> wrapped
>>>>> by
>>>>>> `TracingProcessorSupplier` described above.
>>>>>>
>>>>>> Probably this approach is too naive but works as an initial proof of
>>>>>> concept.
>>>>>>
>>>>>>> Off the top of my head, here are some other approaches you might
>>>>>> evaluate:
>>>>>>> * you mentioned interceptors. Perhaps we could create a
>>>>>>> ProcessorInterceptor interface and add a config to set it.
>>>>>>
>>>>>> This sounds very interesting to me. Then we won't need to touch
>>>> internal
>>>>>> API's, and just provide some configs. One challenge here is how to
>>>> define
>>>>>> the hooks. In consumer/producer, lifecycle is clear,
>>>>> `onConsumer`/`onSend`
>>>>>> and then `onCommit`/`onAck` methods. For Stream processors, how this
>>>> will
>>>>>> look like? Maybe `beforeProcess(context, key, value)` and
>>>>>> `afterProcess(context, key, value)`.
>>>>>>
>>>>>>> * perhaps we could simply build the tracing headers into Streams. Is
>>>>>> there
>>>>>>> a benefit to making it customizable?
>>>>>>
>>>>>> I don't understand this option completely. Do you mean something like
>>>>>> KIP-159 (
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
>>>>>> )?
>>>>>> Headers available on StreamsDSL will allow users to create "custom"
>>>>> traces,
>>>>>> for instance:
>>>>>>
>>>>>> ```
>>>>>> stream.map( (headers, k, v) -> {
>>>>>>   Span span = kafkaTracing.nextSpan(headers).start();
>>>>>>   doSomething(k, v);
>>>>>>   span.finish();
>>>>>> }
>>>>>> ```
>>>>>>
>>>>>> but it won't be possible to instrument the existing processors exposed
>>>> by
>>>>>> DSL only by enabling headers on Streams DSL.
>>>>>>
>>>>>> If we can define a way to pass a `ProcessorSupplier` to be used by
>>>>>> `StreamsBuilder#internalTopology` -not sure if via constructor or some
>>>>>> other way- would be enough to support this use-case.
>>>>>>
>>>>>>> Also, as Matthias said, you would need to create a KIP to propose
>>>> this
>>>>>>> change, but of course we can continue this preliminary discussion
>>>> until
>>>>>> you
>>>>>>> feel confident to create the KIP.
>>>>>>
>>>>>> Happy to do it once the approach is clearer.
>>>>>>
>>>>>> Cheers,
>>>>>> Jorge.
>>>>>>
>>>>>> El lun., 17 sept. 2018 a las 17:09, John Roesler (<john@confluent.io
>>> )
>>>>>> escribió:
>>>>>>
>>>>>>> If I understand the request, it's about tracking the latencies for a
>>>>>>> specific record, not the aggregated latencies for each processor.
>>>>>>>
>>>>>>> Jorge,
>>>>>>>
>>>>>>> The diff you posted only contains the library-side changes, and it's
>>>>> not
>>>>>>> obvious how you would use this to insert the desired tracing code.
>>>>>>> Perhaps you could provide a snippet demonstrating how you want to use
>>>>>> this
>>>>>>> change to enable tracing?
>>>>>>>
>>>>>>> Also, as Matthias said, you would need to create a KIP to propose
>>>> this
>>>>>>> change, but of course we can continue this preliminary discussion
>>>> until
>>>>>> you
>>>>>>> feel confident to create the KIP.
>>>>>>>
>>>>>>> Off the top of my head, here are some other approaches you might
>>>>>> evaluate:
>>>>>>> * you mentioned interceptors. Perhaps we could create a
>>>>>>> ProcessorInterceptor interface and add a config to set it.
>>>>>>> * perhaps we could simply build the tracing headers into Streams. Is
>>>>>> there
>>>>>>> a benefit to making it customizable?
>>>>>>>
>>>>>>> Thanks for considering this problem!
>>>>>>> -John
>>>>>>>
>>>>>>> On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wa...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Jorge,
>>>>>>>>
>>>>>>>> From the TracingProcessor implementation it seems you want to track
>>>>>>>> per-processor processing latency, is that right? If this is the
>>>> case
>>>>>> you
>>>>>>>> can actually use the per-processor metrics which include latency
>>>>>> sensors.
>>>>>>>>
>>>>>>>> If you do want to track, for a certain record, what's the latency
>>>> of
>>>>>>>> processing it, then you'd probably need the processor
>>>> implementation
>>>>> in
>>>>>>>> your repo. In this case, though, I'd suggest to provide a
>>>>>>>> WrapperProcessorSupplier for the users than modifying
>>>>>>>> InternalStreamsTopology: more specifically, you can provide an
>>>>>>>> `abstract WrapperProcessorSupplier
>>>>>>>> implements ProcessorSupplier` and then let users to instantiate
>>>> this
>>>>>>> class
>>>>>>>> instead of the "bare-metal" interface. WDYT?
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>> On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
>>>>>>>> quilcate.jorge@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks for your answer, Matthias!
>>>>>>>>>
>>>>>>>>> What I'm looking for is something similar to interceptors, but
>>>> for
>>>>>>> Stream
>>>>>>>>> Processors.
>>>>>>>>>
>>>>>>>>> In Zipkin -and probably other tracing implementations as well- we
>>>>> are
>>>>>>>> using
>>>>>>>>> Headers to propagate the context of a trace (i.e. adding metadata
>>>>> to
>>>>>>> the
>>>>>>>>> Kafka Record, so we can create references to a trace).
>>>>>>>>> Now that Headers are part of Kafka Streams Processor API, we can
>>>>>>>> propagate
>>>>>>>>> context from input (Consumers) to outputs (Producers) by using
>>>>>>>>> `KafkaClientSupplier` (e.g. <
>>>>>>>>> https://github.com/openzipkin/brave/blob/master/
>>>>>>>>> instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
>>>>>>>>> TracingKafkaClientSupplier.java
>>>>>>>>>> ).
>>>>>>>>>
>>>>>>>>> "Input to Output" traces could be enough for some use-cases, but
>>>> we
>>>>>> are
>>>>>>>>> looking for a more detailed trace -that could cover cases like
>>>>>>>> side-effects
>>>>>>>>> (e.g. for each processor), where input/output and processors
>>>>>> latencies
>>>>>>>> can
>>>>>>>>> be recorded. This is why I have been looking for how to decorate
>>>>> the
>>>>>>>>> `ProcessorSupplier` and all the changes shown in the comparison.
>>>>> Here
>>>>>>> is
>>>>>>>> a
>>>>>>>>> gist of how we are planning to decorate the `addProcessor`
>>>> method:
>>>>>>>>> https://github.com/openzipkin/brave/compare/master...jeqo:
>>>>>>>>> kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
>>>>>>>>>
>>>>>>>>> Hope this makes a bit more sense now :)
>>>>>>>>>
>>>>>>>>> El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
>>>>>>>>> matthias@confluent.io>)
>>>>>>>>> escribió:
>>>>>>>>>
>>>>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
>>>>>>>>>>
>>>>>>>>>> What do you mean by this exactly? Is there a JIRA? I am fine
>>>>>> removing
>>>>>>>>>> `final` from `InternalTopologyBuilder#addProcessor()` -- it's
>>>> an
>>>>>>>>>> internal class.
>>>>>>>>>>
>>>>>>>>>> However, the diff also shows
>>>>>>>>>>
>>>>>>>>>>> public Topology(final InternalTopologyBuilder
>>>>>>>> internalTopologyBuilder)
>>>>>>>>> {
>>>>>>>>>>
>>>>>>>>>> This has two impacts: first, it modifies `Topology` what is
>>>> part
>>>>> of
>>>>>>>>>> public API and would require a KIP. Second, it exposes
>>>>>>>>>> `InternalTopologyBuilder` as part of the public API --
>>>> something
>>>>> we
>>>>>>>>>> should not do.
>>>>>>>>>>
>>>>>>>>>> I am also not sure, why you want to do this (btw: also public
>>>> API
>>>>>>>> change
>>>>>>>>>> requiring a KIP). However, this should not be necessary.
>>>>>>>>>>
>>>>>>>>>>>     public StreamsBuilder(final Topology topology)  {
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I think I am lacking some context what you try to achieve.
>>>> Maybe
>>>>>> you
>>>>>>>> can
>>>>>>>>>> elaborate in the problem you try to solve?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>
>>>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
>>>>>>>>>>>
>>>>>>>>>>> One option is to override and access
>>>>>>>>>>> `InternalTopologyBuilder#addProcessor`. Currently this method
>>>>>> it is
>>>>>>>>>> final,
>>>>>>>>>>> and builder is not exposed as part of `StreamsBuilder`:
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> public class StreamsBuilder {
>>>>>>>>>>>
>>>>>>>>>>>     /** The actual topology that is constructed by this
>>>>>>>> StreamsBuilder.
>>>>>>>>>> */
>>>>>>>>>>>     private final Topology topology = new Topology();
>>>>>>>>>>>
>>>>>>>>>>>     /** The topology's internal builder. */
>>>>>>>>>>>     final InternalTopologyBuilder internalTopologyBuilder =
>>>>>>>>>>> topology.internalTopologyBuilder;
>>>>>>>>>>>
>>>>>>>>>>>     private final InternalStreamsBuilder
>>>>> internalStreamsBuilder =
>>>>>>> new
>>>>>>>>>>> InternalStreamsBuilder(internalTopologyBuilder);
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> The goal is that If `builder#addProcessor` is exposed, we
>>>> could
>>>>>>>>> decorate
>>>>>>>>>>> every `ProcessorSupplier` and capture traces from it:
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> @Override
>>>>>>>>>>>   public void addProcessor(String name, ProcessorSupplier
>>>>>> supplier,
>>>>>>>>>>> String... predecessorNames) {
>>>>>>>>>>>     super.addProcessor(name, new TracingProcessorSupplier(
>>>>>> tracer,
>>>>>>>>> name,
>>>>>>>>>>> supplier), predecessorNames);
>>>>>>>>>>>   }
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> Would it make sense to propose this as a change:
>>>>>>>>>>>
>>>>>>>>
>>>>> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
>>>>>>>>> ?
>>>>>>>>>> or
>>>>>>>>>>> maybe there is a better way to do this?
>>>>>>>>>>> TopologyWrapper does something similar:
>>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/
>>>>>>>>> test/java/org/apache/kafka/streams/TopologyWrapper.java
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance for any help.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Jorge.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>
>>
>>
> 


Re: Accessing Topology Builder

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Good to know, thanks Matthias!

You've mentioned a previous operator, but what about:
`peek().mapValues().peek()`, will both `peek`s be in the same thread as
well?

El mar., 25 sept. 2018 a las 23:14, Matthias J. Sax (<ma...@confluent.io>)
escribió:

> Just for clarification:
>
> `peek()` would run on the same thread and the previous operator. Even
> if---strictly speaking---there is no public contract to guarantee this,
> it would be the case in the current implementation, and I also don't see
> any reason why this would change at any point in the future, because
> it's the most efficient implementation I can think of.
>
> -Matthias
>
> On 9/22/18 4:51 AM, Jorge Esteban Quilcate Otoya wrote:
> > Thanks, everyone!
> >
> > @Bill, the main issue with using `KStraem#peek()` is that AFAIK each
> `peek`
> > processor runs on a potentially different thread, then passing the trace
> > between them could be challenging. It will also require users to add
> these
> > operators themselves, which could be too cumbersome to use.
> >
> > @Guozhang and @John: I will first focus on creating the
> > `TracingProcessorSupplier` for instrumenting custom `Processors` and I
> will
> > keep the idea of a `ProcessorInterceptor` in the back of my head to see
> if
> > it make sense to propose a KIP for this.
> >
> > Thanks again for your feedback!
> >
> > Cheers,
> > Jorge.
> > El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bb...@gmail.com>)
> > escribió:
> >
> >> Jorge:
> >>
> >> I have a crazy idea off the top of my head.
> >>
> >> Would something as low-tech using KSteam.peek calls on either side of
> >> certain processors to record start and end times work?
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >>
> >>> Jorge:
> >>>
> >>> My suggestion was to let your users to implement on the
> >>> TracingProcessorSupplier
> >>> / TracingProcessor directly instead of the base-line ProcessorSupplier
> /
> >>> Processor. Would that work for you?
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya <
> >>> quilcate.jorge@gmail.com> wrote:
> >>>
> >>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
> >>>> Guozhang and John.
> >>>>
> >>>> @Guozhang:
> >>>>
> >>>>> I'd suggest to provide a
> >>>>> WrapperProcessorSupplier for the users than modifying
> >>>>> InternalStreamsTopology: more specifically, you can provide an
> >>>>> `abstract WrapperProcessorSupplier
> >>>>> implements ProcessorSupplier` and then let users to instantiate this
> >>>> class
> >>>>> instead of the "bare-metal" interface. WDYT?
> >>>>
> >>>> Yes, in the gist, I have a class implementing `ProcessorSupplier`:
> >>>>
> >>>> ```
> >>>> public class TracingProcessorSupplier<K, V> implements
> >>> ProcessorSupplier<K,
> >>>> V> {
> >>>>   final KafkaTracing kafkaTracing;
> >>>>   final String name;
> >>>>   final ProcessorSupplier<K, V> delegate;
> >>>>    public TracingProcessorSupplier(KafkaTracing kafkaTracing,
> >>>>       String name, ProcessorSupplier<K, V> delegate) {
> >>>>     this.kafkaTracing = kafkaTracing;
> >>>>     this.name = name;
> >>>>     this.delegate = delegate;
> >>>>   }
> >>>>    @Override public Processor<K, V> get() {
> >>>>     return new TracingProcessor<>(kafkaTracing, name, delegate.get());
> >>>>   }
> >>>> }
> >>>> ```
> >>>>
> >>>> My challenge is how to wrap Topology Processors created by
> >>>> `StreamsBuilder#build` to make this instrumentation easy to adopt by
> >>> Kafka
> >>>> Streams users.
> >>>>
> >>>> @John:
> >>>>
> >>>>> The diff you posted only contains the library-side changes, and it's
> >>> not
> >>>>> obvious how you would use this to insert the desired tracing code.
> >>>>> Perhaps you could provide a snippet demonstrating how you want to use
> >>>> this
> >>>>> change to enable tracing?
> >>>>
> >>>> My first approach was something like this:
> >>>>
> >>>> ```
> >>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();
> >>>> ```
> >>>>
> >>>> Where `KafkaStreamsTracing#builder` looks like this:
> >>>>
> >>>> ```
> >>>>   public StreamsBuilder builder() {
> >>>>     return new StreamsBuilder(new Topology(new
> >>>> TracingInternalTopologyBuilder(kafkaTracing)));
> >>>>   }
> >>>> ```
> >>>>
> >>>> Then, once the builder creates a topology, `processors` will be
> wrapped
> >>> by
> >>>> `TracingProcessorSupplier` described above.
> >>>>
> >>>> Probably this approach is too naive but works as an initial proof of
> >>>> concept.
> >>>>
> >>>>> Off the top of my head, here are some other approaches you might
> >>>> evaluate:
> >>>>> * you mentioned interceptors. Perhaps we could create a
> >>>>> ProcessorInterceptor interface and add a config to set it.
> >>>>
> >>>> This sounds very interesting to me. Then we won't need to touch
> >> internal
> >>>> API's, and just provide some configs. One challenge here is how to
> >> define
> >>>> the hooks. In consumer/producer, lifecycle is clear,
> >>> `onConsumer`/`onSend`
> >>>> and then `onCommit`/`onAck` methods. For Stream processors, how this
> >> will
> >>>> look like? Maybe `beforeProcess(context, key, value)` and
> >>>> `afterProcess(context, key, value)`.
> >>>>
> >>>>> * perhaps we could simply build the tracing headers into Streams. Is
> >>>> there
> >>>>> a benefit to making it customizable?
> >>>>
> >>>> I don't understand this option completely. Do you mean something like
> >>>> KIP-159 (
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>> 159%3A+Introducing+Rich+functions+to+Streams
> >>>> )?
> >>>> Headers available on StreamsDSL will allow users to create "custom"
> >>> traces,
> >>>> for instance:
> >>>>
> >>>> ```
> >>>> stream.map( (headers, k, v) -> {
> >>>>   Span span = kafkaTracing.nextSpan(headers).start();
> >>>>   doSomething(k, v);
> >>>>   span.finish();
> >>>> }
> >>>> ```
> >>>>
> >>>> but it won't be possible to instrument the existing processors exposed
> >> by
> >>>> DSL only by enabling headers on Streams DSL.
> >>>>
> >>>> If we can define a way to pass a `ProcessorSupplier` to be used by
> >>>> `StreamsBuilder#internalTopology` -not sure if via constructor or some
> >>>> other way- would be enough to support this use-case.
> >>>>
> >>>>> Also, as Matthias said, you would need to create a KIP to propose
> >> this
> >>>>> change, but of course we can continue this preliminary discussion
> >> until
> >>>> you
> >>>>> feel confident to create the KIP.
> >>>>
> >>>> Happy to do it once the approach is clearer.
> >>>>
> >>>> Cheers,
> >>>> Jorge.
> >>>>
> >>>> El lun., 17 sept. 2018 a las 17:09, John Roesler (<john@confluent.io
> >)
> >>>> escribió:
> >>>>
> >>>>> If I understand the request, it's about tracking the latencies for a
> >>>>> specific record, not the aggregated latencies for each processor.
> >>>>>
> >>>>> Jorge,
> >>>>>
> >>>>> The diff you posted only contains the library-side changes, and it's
> >>> not
> >>>>> obvious how you would use this to insert the desired tracing code.
> >>>>> Perhaps you could provide a snippet demonstrating how you want to use
> >>>> this
> >>>>> change to enable tracing?
> >>>>>
> >>>>> Also, as Matthias said, you would need to create a KIP to propose
> >> this
> >>>>> change, but of course we can continue this preliminary discussion
> >> until
> >>>> you
> >>>>> feel confident to create the KIP.
> >>>>>
> >>>>> Off the top of my head, here are some other approaches you might
> >>>> evaluate:
> >>>>> * you mentioned interceptors. Perhaps we could create a
> >>>>> ProcessorInterceptor interface and add a config to set it.
> >>>>> * perhaps we could simply build the tracing headers into Streams. Is
> >>>> there
> >>>>> a benefit to making it customizable?
> >>>>>
> >>>>> Thanks for considering this problem!
> >>>>> -John
> >>>>>
> >>>>> On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wa...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hello Jorge,
> >>>>>>
> >>>>>> From the TracingProcessor implementation it seems you want to track
> >>>>>> per-processor processing latency, is that right? If this is the
> >> case
> >>>> you
> >>>>>> can actually use the per-processor metrics which include latency
> >>>> sensors.
> >>>>>>
> >>>>>> If you do want to track, for a certain record, what's the latency
> >> of
> >>>>>> processing it, then you'd probably need the processor
> >> implementation
> >>> in
> >>>>>> your repo. In this case, though, I'd suggest to provide a
> >>>>>> WrapperProcessorSupplier for the users than modifying
> >>>>>> InternalStreamsTopology: more specifically, you can provide an
> >>>>>> `abstract WrapperProcessorSupplier
> >>>>>> implements ProcessorSupplier` and then let users to instantiate
> >> this
> >>>>> class
> >>>>>> instead of the "bare-metal" interface. WDYT?
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
> >>>>>> quilcate.jorge@gmail.com> wrote:
> >>>>>>
> >>>>>>> Thanks for your answer, Matthias!
> >>>>>>>
> >>>>>>> What I'm looking for is something similar to interceptors, but
> >> for
> >>>>> Stream
> >>>>>>> Processors.
> >>>>>>>
> >>>>>>> In Zipkin -and probably other tracing implementations as well- we
> >>> are
> >>>>>> using
> >>>>>>> Headers to propagate the context of a trace (i.e. adding metadata
> >>> to
> >>>>> the
> >>>>>>> Kafka Record, so we can create references to a trace).
> >>>>>>> Now that Headers are part of Kafka Streams Processor API, we can
> >>>>>> propagate
> >>>>>>> context from input (Consumers) to outputs (Producers) by using
> >>>>>>> `KafkaClientSupplier` (e.g. <
> >>>>>>> https://github.com/openzipkin/brave/blob/master/
> >>>>>>> instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> >>>>>>> TracingKafkaClientSupplier.java
> >>>>>>>> ).
> >>>>>>>
> >>>>>>> "Input to Output" traces could be enough for some use-cases, but
> >> we
> >>>> are
> >>>>>>> looking for a more detailed trace -that could cover cases like
> >>>>>> side-effects
> >>>>>>> (e.g. for each processor), where input/output and processors
> >>>> latencies
> >>>>>> can
> >>>>>>> be recorded. This is why I have been looking for how to decorate
> >>> the
> >>>>>>> `ProcessorSupplier` and all the changes shown in the comparison.
> >>> Here
> >>>>> is
> >>>>>> a
> >>>>>>> gist of how we are planning to decorate the `addProcessor`
> >> method:
> >>>>>>> https://github.com/openzipkin/brave/compare/master...jeqo:
> >>>>>>> kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
> >>>>>>>
> >>>>>>> Hope this makes a bit more sense now :)
> >>>>>>>
> >>>>>>> El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> >>>>>>> matthias@confluent.io>)
> >>>>>>> escribió:
> >>>>>>>
> >>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
> >>>>>>>>
> >>>>>>>> What do you mean by this exactly? Is there a JIRA? I am fine
> >>>> removing
> >>>>>>>> `final` from `InternalTopologyBuilder#addProcessor()` -- it's
> >> an
> >>>>>>>> internal class.
> >>>>>>>>
> >>>>>>>> However, the diff also shows
> >>>>>>>>
> >>>>>>>>> public Topology(final InternalTopologyBuilder
> >>>>>> internalTopologyBuilder)
> >>>>>>> {
> >>>>>>>>
> >>>>>>>> This has two impacts: first, it modifies `Topology` what is
> >> part
> >>> of
> >>>>>>>> public API and would require a KIP. Second, it exposes
> >>>>>>>> `InternalTopologyBuilder` as part of the public API --
> >> something
> >>> we
> >>>>>>>> should not do.
> >>>>>>>>
> >>>>>>>> I am also not sure, why you want to do this (btw: also public
> >> API
> >>>>>> change
> >>>>>>>> requiring a KIP). However, this should not be necessary.
> >>>>>>>>
> >>>>>>>>>     public StreamsBuilder(final Topology topology)  {
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> I think I am lacking some context what you try to achieve.
> >> Maybe
> >>>> you
> >>>>>> can
> >>>>>>>> elaborate in the problem you try to solve?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> >>>>>>>>> Hi everyone,
> >>>>>>>>>
> >>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
> >>>>>>>>>
> >>>>>>>>> One option is to override and access
> >>>>>>>>> `InternalTopologyBuilder#addProcessor`. Currently this method
> >>>> it is
> >>>>>>>> final,
> >>>>>>>>> and builder is not exposed as part of `StreamsBuilder`:
> >>>>>>>>>
> >>>>>>>>> ```
> >>>>>>>>> public class StreamsBuilder {
> >>>>>>>>>
> >>>>>>>>>     /** The actual topology that is constructed by this
> >>>>>> StreamsBuilder.
> >>>>>>>> */
> >>>>>>>>>     private final Topology topology = new Topology();
> >>>>>>>>>
> >>>>>>>>>     /** The topology's internal builder. */
> >>>>>>>>>     final InternalTopologyBuilder internalTopologyBuilder =
> >>>>>>>>> topology.internalTopologyBuilder;
> >>>>>>>>>
> >>>>>>>>>     private final InternalStreamsBuilder
> >>> internalStreamsBuilder =
> >>>>> new
> >>>>>>>>> InternalStreamsBuilder(internalTopologyBuilder);
> >>>>>>>>> ```
> >>>>>>>>>
> >>>>>>>>> The goal is that If `builder#addProcessor` is exposed, we
> >> could
> >>>>>>> decorate
> >>>>>>>>> every `ProcessorSupplier` and capture traces from it:
> >>>>>>>>>
> >>>>>>>>> ```
> >>>>>>>>> @Override
> >>>>>>>>>   public void addProcessor(String name, ProcessorSupplier
> >>>> supplier,
> >>>>>>>>> String... predecessorNames) {
> >>>>>>>>>     super.addProcessor(name, new TracingProcessorSupplier(
> >>>> tracer,
> >>>>>>> name,
> >>>>>>>>> supplier), predecessorNames);
> >>>>>>>>>   }
> >>>>>>>>> ```
> >>>>>>>>>
> >>>>>>>>> Would it make sense to propose this as a change:
> >>>>>>>>>
> >>>>>>
> >>> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> >>>>>>> ?
> >>>>>>>> or
> >>>>>>>>> maybe there is a better way to do this?
> >>>>>>>>> TopologyWrapper does something similar:
> >>>>>>>>>
> >>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/
> >>>>>>> test/java/org/apache/kafka/streams/TopologyWrapper.java
> >>>>>>>>>
> >>>>>>>>> Thanks in advance for any help.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Jorge.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >
>
>

Re: Accessing Topology Builder

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Just for clarification:

`peek()` would run on the same thread and the previous operator. Even
if---strictly speaking---there is no public contract to guarantee this,
it would be the case in the current implementation, and I also don't see
any reason why this would change at any point in the future, because
it's the most efficient implementation I can think of.

-Matthias

On 9/22/18 4:51 AM, Jorge Esteban Quilcate Otoya wrote:
> Thanks, everyone!
> 
> @Bill, the main issue with using `KStraem#peek()` is that AFAIK each `peek`
> processor runs on a potentially different thread, then passing the trace
> between them could be challenging. It will also require users to add these
> operators themselves, which could be too cumbersome to use.
> 
> @Guozhang and @John: I will first focus on creating the
> `TracingProcessorSupplier` for instrumenting custom `Processors` and I will
> keep the idea of a `ProcessorInterceptor` in the back of my head to see if
> it make sense to propose a KIP for this.
> 
> Thanks again for your feedback!
> 
> Cheers,
> Jorge.
> El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bb...@gmail.com>)
> escribió:
> 
>> Jorge:
>>
>> I have a crazy idea off the top of my head.
>>
>> Would something as low-tech using KSteam.peek calls on either side of
>> certain processors to record start and end times work?
>>
>> Thanks,
>> Bill
>>
>> On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wa...@gmail.com> wrote:
>>
>>> Jorge:
>>>
>>> My suggestion was to let your users to implement on the
>>> TracingProcessorSupplier
>>> / TracingProcessor directly instead of the base-line ProcessorSupplier /
>>> Processor. Would that work for you?
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya <
>>> quilcate.jorge@gmail.com> wrote:
>>>
>>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
>>>> Guozhang and John.
>>>>
>>>> @Guozhang:
>>>>
>>>>> I'd suggest to provide a
>>>>> WrapperProcessorSupplier for the users than modifying
>>>>> InternalStreamsTopology: more specifically, you can provide an
>>>>> `abstract WrapperProcessorSupplier
>>>>> implements ProcessorSupplier` and then let users to instantiate this
>>>> class
>>>>> instead of the "bare-metal" interface. WDYT?
>>>>
>>>> Yes, in the gist, I have a class implementing `ProcessorSupplier`:
>>>>
>>>> ```
>>>> public class TracingProcessorSupplier<K, V> implements
>>> ProcessorSupplier<K,
>>>> V> {
>>>>   final KafkaTracing kafkaTracing;
>>>>   final String name;
>>>>   final ProcessorSupplier<K, V> delegate;
>>>>    public TracingProcessorSupplier(KafkaTracing kafkaTracing,
>>>>       String name, ProcessorSupplier<K, V> delegate) {
>>>>     this.kafkaTracing = kafkaTracing;
>>>>     this.name = name;
>>>>     this.delegate = delegate;
>>>>   }
>>>>    @Override public Processor<K, V> get() {
>>>>     return new TracingProcessor<>(kafkaTracing, name, delegate.get());
>>>>   }
>>>> }
>>>> ```
>>>>
>>>> My challenge is how to wrap Topology Processors created by
>>>> `StreamsBuilder#build` to make this instrumentation easy to adopt by
>>> Kafka
>>>> Streams users.
>>>>
>>>> @John:
>>>>
>>>>> The diff you posted only contains the library-side changes, and it's
>>> not
>>>>> obvious how you would use this to insert the desired tracing code.
>>>>> Perhaps you could provide a snippet demonstrating how you want to use
>>>> this
>>>>> change to enable tracing?
>>>>
>>>> My first approach was something like this:
>>>>
>>>> ```
>>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();
>>>> ```
>>>>
>>>> Where `KafkaStreamsTracing#builder` looks like this:
>>>>
>>>> ```
>>>>   public StreamsBuilder builder() {
>>>>     return new StreamsBuilder(new Topology(new
>>>> TracingInternalTopologyBuilder(kafkaTracing)));
>>>>   }
>>>> ```
>>>>
>>>> Then, once the builder creates a topology, `processors` will be wrapped
>>> by
>>>> `TracingProcessorSupplier` described above.
>>>>
>>>> Probably this approach is too naive but works as an initial proof of
>>>> concept.
>>>>
>>>>> Off the top of my head, here are some other approaches you might
>>>> evaluate:
>>>>> * you mentioned interceptors. Perhaps we could create a
>>>>> ProcessorInterceptor interface and add a config to set it.
>>>>
>>>> This sounds very interesting to me. Then we won't need to touch
>> internal
>>>> API's, and just provide some configs. One challenge here is how to
>> define
>>>> the hooks. In consumer/producer, lifecycle is clear,
>>> `onConsumer`/`onSend`
>>>> and then `onCommit`/`onAck` methods. For Stream processors, how this
>> will
>>>> look like? Maybe `beforeProcess(context, key, value)` and
>>>> `afterProcess(context, key, value)`.
>>>>
>>>>> * perhaps we could simply build the tracing headers into Streams. Is
>>>> there
>>>>> a benefit to making it customizable?
>>>>
>>>> I don't understand this option completely. Do you mean something like
>>>> KIP-159 (
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 159%3A+Introducing+Rich+functions+to+Streams
>>>> )?
>>>> Headers available on StreamsDSL will allow users to create "custom"
>>> traces,
>>>> for instance:
>>>>
>>>> ```
>>>> stream.map( (headers, k, v) -> {
>>>>   Span span = kafkaTracing.nextSpan(headers).start();
>>>>   doSomething(k, v);
>>>>   span.finish();
>>>> }
>>>> ```
>>>>
>>>> but it won't be possible to instrument the existing processors exposed
>> by
>>>> DSL only by enabling headers on Streams DSL.
>>>>
>>>> If we can define a way to pass a `ProcessorSupplier` to be used by
>>>> `StreamsBuilder#internalTopology` -not sure if via constructor or some
>>>> other way- would be enough to support this use-case.
>>>>
>>>>> Also, as Matthias said, you would need to create a KIP to propose
>> this
>>>>> change, but of course we can continue this preliminary discussion
>> until
>>>> you
>>>>> feel confident to create the KIP.
>>>>
>>>> Happy to do it once the approach is clearer.
>>>>
>>>> Cheers,
>>>> Jorge.
>>>>
>>>> El lun., 17 sept. 2018 a las 17:09, John Roesler (<jo...@confluent.io>)
>>>> escribió:
>>>>
>>>>> If I understand the request, it's about tracking the latencies for a
>>>>> specific record, not the aggregated latencies for each processor.
>>>>>
>>>>> Jorge,
>>>>>
>>>>> The diff you posted only contains the library-side changes, and it's
>>> not
>>>>> obvious how you would use this to insert the desired tracing code.
>>>>> Perhaps you could provide a snippet demonstrating how you want to use
>>>> this
>>>>> change to enable tracing?
>>>>>
>>>>> Also, as Matthias said, you would need to create a KIP to propose
>> this
>>>>> change, but of course we can continue this preliminary discussion
>> until
>>>> you
>>>>> feel confident to create the KIP.
>>>>>
>>>>> Off the top of my head, here are some other approaches you might
>>>> evaluate:
>>>>> * you mentioned interceptors. Perhaps we could create a
>>>>> ProcessorInterceptor interface and add a config to set it.
>>>>> * perhaps we could simply build the tracing headers into Streams. Is
>>>> there
>>>>> a benefit to making it customizable?
>>>>>
>>>>> Thanks for considering this problem!
>>>>> -John
>>>>>
>>>>> On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wa...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Hello Jorge,
>>>>>>
>>>>>> From the TracingProcessor implementation it seems you want to track
>>>>>> per-processor processing latency, is that right? If this is the
>> case
>>>> you
>>>>>> can actually use the per-processor metrics which include latency
>>>> sensors.
>>>>>>
>>>>>> If you do want to track, for a certain record, what's the latency
>> of
>>>>>> processing it, then you'd probably need the processor
>> implementation
>>> in
>>>>>> your repo. In this case, though, I'd suggest to provide a
>>>>>> WrapperProcessorSupplier for the users than modifying
>>>>>> InternalStreamsTopology: more specifically, you can provide an
>>>>>> `abstract WrapperProcessorSupplier
>>>>>> implements ProcessorSupplier` and then let users to instantiate
>> this
>>>>> class
>>>>>> instead of the "bare-metal" interface. WDYT?
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>> On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
>>>>>> quilcate.jorge@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for your answer, Matthias!
>>>>>>>
>>>>>>> What I'm looking for is something similar to interceptors, but
>> for
>>>>> Stream
>>>>>>> Processors.
>>>>>>>
>>>>>>> In Zipkin -and probably other tracing implementations as well- we
>>> are
>>>>>> using
>>>>>>> Headers to propagate the context of a trace (i.e. adding metadata
>>> to
>>>>> the
>>>>>>> Kafka Record, so we can create references to a trace).
>>>>>>> Now that Headers are part of Kafka Streams Processor API, we can
>>>>>> propagate
>>>>>>> context from input (Consumers) to outputs (Producers) by using
>>>>>>> `KafkaClientSupplier` (e.g. <
>>>>>>> https://github.com/openzipkin/brave/blob/master/
>>>>>>> instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
>>>>>>> TracingKafkaClientSupplier.java
>>>>>>>> ).
>>>>>>>
>>>>>>> "Input to Output" traces could be enough for some use-cases, but
>> we
>>>> are
>>>>>>> looking for a more detailed trace -that could cover cases like
>>>>>> side-effects
>>>>>>> (e.g. for each processor), where input/output and processors
>>>> latencies
>>>>>> can
>>>>>>> be recorded. This is why I have been looking for how to decorate
>>> the
>>>>>>> `ProcessorSupplier` and all the changes shown in the comparison.
>>> Here
>>>>> is
>>>>>> a
>>>>>>> gist of how we are planning to decorate the `addProcessor`
>> method:
>>>>>>> https://github.com/openzipkin/brave/compare/master...jeqo:
>>>>>>> kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
>>>>>>>
>>>>>>> Hope this makes a bit more sense now :)
>>>>>>>
>>>>>>> El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
>>>>>>> matthias@confluent.io>)
>>>>>>> escribió:
>>>>>>>
>>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
>>>>>>>>
>>>>>>>> What do you mean by this exactly? Is there a JIRA? I am fine
>>>> removing
>>>>>>>> `final` from `InternalTopologyBuilder#addProcessor()` -- it's
>> an
>>>>>>>> internal class.
>>>>>>>>
>>>>>>>> However, the diff also shows
>>>>>>>>
>>>>>>>>> public Topology(final InternalTopologyBuilder
>>>>>> internalTopologyBuilder)
>>>>>>> {
>>>>>>>>
>>>>>>>> This has two impacts: first, it modifies `Topology` what is
>> part
>>> of
>>>>>>>> public API and would require a KIP. Second, it exposes
>>>>>>>> `InternalTopologyBuilder` as part of the public API --
>> something
>>> we
>>>>>>>> should not do.
>>>>>>>>
>>>>>>>> I am also not sure, why you want to do this (btw: also public
>> API
>>>>>> change
>>>>>>>> requiring a KIP). However, this should not be necessary.
>>>>>>>>
>>>>>>>>>     public StreamsBuilder(final Topology topology)  {
>>>>>>>>
>>>>>>>>
>>>>>>>> I think I am lacking some context what you try to achieve.
>> Maybe
>>>> you
>>>>>> can
>>>>>>>> elaborate in the problem you try to solve?
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
>>>>>>>>>
>>>>>>>>> One option is to override and access
>>>>>>>>> `InternalTopologyBuilder#addProcessor`. Currently this method
>>>> it is
>>>>>>>> final,
>>>>>>>>> and builder is not exposed as part of `StreamsBuilder`:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> public class StreamsBuilder {
>>>>>>>>>
>>>>>>>>>     /** The actual topology that is constructed by this
>>>>>> StreamsBuilder.
>>>>>>>> */
>>>>>>>>>     private final Topology topology = new Topology();
>>>>>>>>>
>>>>>>>>>     /** The topology's internal builder. */
>>>>>>>>>     final InternalTopologyBuilder internalTopologyBuilder =
>>>>>>>>> topology.internalTopologyBuilder;
>>>>>>>>>
>>>>>>>>>     private final InternalStreamsBuilder
>>> internalStreamsBuilder =
>>>>> new
>>>>>>>>> InternalStreamsBuilder(internalTopologyBuilder);
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> The goal is that If `builder#addProcessor` is exposed, we
>> could
>>>>>>> decorate
>>>>>>>>> every `ProcessorSupplier` and capture traces from it:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> @Override
>>>>>>>>>   public void addProcessor(String name, ProcessorSupplier
>>>> supplier,
>>>>>>>>> String... predecessorNames) {
>>>>>>>>>     super.addProcessor(name, new TracingProcessorSupplier(
>>>> tracer,
>>>>>>> name,
>>>>>>>>> supplier), predecessorNames);
>>>>>>>>>   }
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Would it make sense to propose this as a change:
>>>>>>>>>
>>>>>>
>>> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
>>>>>>> ?
>>>>>>>> or
>>>>>>>>> maybe there is a better way to do this?
>>>>>>>>> TopologyWrapper does something similar:
>>>>>>>>>
>>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/
>>>>>>> test/java/org/apache/kafka/streams/TopologyWrapper.java
>>>>>>>>>
>>>>>>>>> Thanks in advance for any help.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Jorge.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 


Re: Accessing Topology Builder

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Thanks, everyone!

@Bill, the main issue with using `KStraem#peek()` is that AFAIK each `peek`
processor runs on a potentially different thread, then passing the trace
between them could be challenging. It will also require users to add these
operators themselves, which could be too cumbersome to use.

@Guozhang and @John: I will first focus on creating the
`TracingProcessorSupplier` for instrumenting custom `Processors` and I will
keep the idea of a `ProcessorInterceptor` in the back of my head to see if
it make sense to propose a KIP for this.

Thanks again for your feedback!

Cheers,
Jorge.
El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bb...@gmail.com>)
escribió:

> Jorge:
>
> I have a crazy idea off the top of my head.
>
> Would something as low-tech using KSteam.peek calls on either side of
> certain processors to record start and end times work?
>
> Thanks,
> Bill
>
> On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Jorge:
> >
> > My suggestion was to let your users to implement on the
> > TracingProcessorSupplier
> > / TracingProcessor directly instead of the base-line ProcessorSupplier /
> > Processor. Would that work for you?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya <
> > quilcate.jorge@gmail.com> wrote:
> >
> > > final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
> > > Guozhang and John.
> > >
> > > @Guozhang:
> > >
> > > > I'd suggest to provide a
> > > > WrapperProcessorSupplier for the users than modifying
> > > > InternalStreamsTopology: more specifically, you can provide an
> > > > `abstract WrapperProcessorSupplier
> > > > implements ProcessorSupplier` and then let users to instantiate this
> > > class
> > > > instead of the "bare-metal" interface. WDYT?
> > >
> > > Yes, in the gist, I have a class implementing `ProcessorSupplier`:
> > >
> > > ```
> > > public class TracingProcessorSupplier<K, V> implements
> > ProcessorSupplier<K,
> > > V> {
> > >   final KafkaTracing kafkaTracing;
> > >   final String name;
> > >   final ProcessorSupplier<K, V> delegate;
> > >    public TracingProcessorSupplier(KafkaTracing kafkaTracing,
> > >       String name, ProcessorSupplier<K, V> delegate) {
> > >     this.kafkaTracing = kafkaTracing;
> > >     this.name = name;
> > >     this.delegate = delegate;
> > >   }
> > >    @Override public Processor<K, V> get() {
> > >     return new TracingProcessor<>(kafkaTracing, name, delegate.get());
> > >   }
> > > }
> > > ```
> > >
> > > My challenge is how to wrap Topology Processors created by
> > > `StreamsBuilder#build` to make this instrumentation easy to adopt by
> > Kafka
> > > Streams users.
> > >
> > > @John:
> > >
> > > > The diff you posted only contains the library-side changes, and it's
> > not
> > > > obvious how you would use this to insert the desired tracing code.
> > > > Perhaps you could provide a snippet demonstrating how you want to use
> > > this
> > > > change to enable tracing?
> > >
> > > My first approach was something like this:
> > >
> > > ```
> > > final StreamsBuilder builder = kafkaStreamsTracing.builder();
> > > ```
> > >
> > > Where `KafkaStreamsTracing#builder` looks like this:
> > >
> > > ```
> > >   public StreamsBuilder builder() {
> > >     return new StreamsBuilder(new Topology(new
> > > TracingInternalTopologyBuilder(kafkaTracing)));
> > >   }
> > > ```
> > >
> > > Then, once the builder creates a topology, `processors` will be wrapped
> > by
> > > `TracingProcessorSupplier` described above.
> > >
> > > Probably this approach is too naive but works as an initial proof of
> > > concept.
> > >
> > > > Off the top of my head, here are some other approaches you might
> > > evaluate:
> > > > * you mentioned interceptors. Perhaps we could create a
> > > > ProcessorInterceptor interface and add a config to set it.
> > >
> > > This sounds very interesting to me. Then we won't need to touch
> internal
> > > API's, and just provide some configs. One challenge here is how to
> define
> > > the hooks. In consumer/producer, lifecycle is clear,
> > `onConsumer`/`onSend`
> > > and then `onCommit`/`onAck` methods. For Stream processors, how this
> will
> > > look like? Maybe `beforeProcess(context, key, value)` and
> > > `afterProcess(context, key, value)`.
> > >
> > > > * perhaps we could simply build the tracing headers into Streams. Is
> > > there
> > > > a benefit to making it customizable?
> > >
> > > I don't understand this option completely. Do you mean something like
> > > KIP-159 (
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 159%3A+Introducing+Rich+functions+to+Streams
> > > )?
> > > Headers available on StreamsDSL will allow users to create "custom"
> > traces,
> > > for instance:
> > >
> > > ```
> > > stream.map( (headers, k, v) -> {
> > >   Span span = kafkaTracing.nextSpan(headers).start();
> > >   doSomething(k, v);
> > >   span.finish();
> > > }
> > > ```
> > >
> > > but it won't be possible to instrument the existing processors exposed
> by
> > > DSL only by enabling headers on Streams DSL.
> > >
> > > If we can define a way to pass a `ProcessorSupplier` to be used by
> > > `StreamsBuilder#internalTopology` -not sure if via constructor or some
> > > other way- would be enough to support this use-case.
> > >
> > > > Also, as Matthias said, you would need to create a KIP to propose
> this
> > > > change, but of course we can continue this preliminary discussion
> until
> > > you
> > > > feel confident to create the KIP.
> > >
> > > Happy to do it once the approach is clearer.
> > >
> > > Cheers,
> > > Jorge.
> > >
> > > El lun., 17 sept. 2018 a las 17:09, John Roesler (<jo...@confluent.io>)
> > > escribió:
> > >
> > > > If I understand the request, it's about tracking the latencies for a
> > > > specific record, not the aggregated latencies for each processor.
> > > >
> > > > Jorge,
> > > >
> > > > The diff you posted only contains the library-side changes, and it's
> > not
> > > > obvious how you would use this to insert the desired tracing code.
> > > > Perhaps you could provide a snippet demonstrating how you want to use
> > > this
> > > > change to enable tracing?
> > > >
> > > > Also, as Matthias said, you would need to create a KIP to propose
> this
> > > > change, but of course we can continue this preliminary discussion
> until
> > > you
> > > > feel confident to create the KIP.
> > > >
> > > > Off the top of my head, here are some other approaches you might
> > > evaluate:
> > > > * you mentioned interceptors. Perhaps we could create a
> > > > ProcessorInterceptor interface and add a config to set it.
> > > > * perhaps we could simply build the tracing headers into Streams. Is
> > > there
> > > > a benefit to making it customizable?
> > > >
> > > > Thanks for considering this problem!
> > > > -John
> > > >
> > > > On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Jorge,
> > > > >
> > > > > From the TracingProcessor implementation it seems you want to track
> > > > > per-processor processing latency, is that right? If this is the
> case
> > > you
> > > > > can actually use the per-processor metrics which include latency
> > > sensors.
> > > > >
> > > > > If you do want to track, for a certain record, what's the latency
> of
> > > > > processing it, then you'd probably need the processor
> implementation
> > in
> > > > > your repo. In this case, though, I'd suggest to provide a
> > > > > WrapperProcessorSupplier for the users than modifying
> > > > > InternalStreamsTopology: more specifically, you can provide an
> > > > > `abstract WrapperProcessorSupplier
> > > > > implements ProcessorSupplier` and then let users to instantiate
> this
> > > > class
> > > > > instead of the "bare-metal" interface. WDYT?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
> > > > > quilcate.jorge@gmail.com> wrote:
> > > > >
> > > > > > Thanks for your answer, Matthias!
> > > > > >
> > > > > > What I'm looking for is something similar to interceptors, but
> for
> > > > Stream
> > > > > > Processors.
> > > > > >
> > > > > > In Zipkin -and probably other tracing implementations as well- we
> > are
> > > > > using
> > > > > > Headers to propagate the context of a trace (i.e. adding metadata
> > to
> > > > the
> > > > > > Kafka Record, so we can create references to a trace).
> > > > > > Now that Headers are part of Kafka Streams Processor API, we can
> > > > > propagate
> > > > > > context from input (Consumers) to outputs (Producers) by using
> > > > > > `KafkaClientSupplier` (e.g. <
> > > > > > https://github.com/openzipkin/brave/blob/master/
> > > > > > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> > > > > > TracingKafkaClientSupplier.java
> > > > > > >).
> > > > > >
> > > > > > "Input to Output" traces could be enough for some use-cases, but
> we
> > > are
> > > > > > looking for a more detailed trace -that could cover cases like
> > > > > side-effects
> > > > > > (e.g. for each processor), where input/output and processors
> > > latencies
> > > > > can
> > > > > > be recorded. This is why I have been looking for how to decorate
> > the
> > > > > > `ProcessorSupplier` and all the changes shown in the comparison.
> > Here
> > > > is
> > > > > a
> > > > > > gist of how we are planning to decorate the `addProcessor`
> method:
> > > > > > https://github.com/openzipkin/brave/compare/master...jeqo:
> > > > > > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
> > > > > >
> > > > > > Hope this makes a bit more sense now :)
> > > > > >
> > > > > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> > > > > > matthias@confluent.io>)
> > > > > > escribió:
> > > > > >
> > > > > > > >> I'm experimenting on how to add tracing to Kafka Streams.
> > > > > > >
> > > > > > > What do you mean by this exactly? Is there a JIRA? I am fine
> > > removing
> > > > > > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's
> an
> > > > > > > internal class.
> > > > > > >
> > > > > > > However, the diff also shows
> > > > > > >
> > > > > > > > public Topology(final InternalTopologyBuilder
> > > > > internalTopologyBuilder)
> > > > > > {
> > > > > > >
> > > > > > > This has two impacts: first, it modifies `Topology` what is
> part
> > of
> > > > > > > public API and would require a KIP. Second, it exposes
> > > > > > > `InternalTopologyBuilder` as part of the public API --
> something
> > we
> > > > > > > should not do.
> > > > > > >
> > > > > > > I am also not sure, why you want to do this (btw: also public
> API
> > > > > change
> > > > > > > requiring a KIP). However, this should not be necessary.
> > > > > > >
> > > > > > > >     public StreamsBuilder(final Topology topology)  {
> > > > > > >
> > > > > > >
> > > > > > > I think I am lacking some context what you try to achieve.
> Maybe
> > > you
> > > > > can
> > > > > > > elaborate in the problem you try to solve?
> > > > > > >
> > > > > > >
> > > > > > > -Matthias
> > > > > > >
> > > > > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > I'm experimenting on how to add tracing to Kafka Streams.
> > > > > > > >
> > > > > > > > One option is to override and access
> > > > > > > > `InternalTopologyBuilder#addProcessor`. Currently this method
> > > it is
> > > > > > > final,
> > > > > > > > and builder is not exposed as part of `StreamsBuilder`:
> > > > > > > >
> > > > > > > > ```
> > > > > > > > public class StreamsBuilder {
> > > > > > > >
> > > > > > > >     /** The actual topology that is constructed by this
> > > > > StreamsBuilder.
> > > > > > > */
> > > > > > > >     private final Topology topology = new Topology();
> > > > > > > >
> > > > > > > >     /** The topology's internal builder. */
> > > > > > > >     final InternalTopologyBuilder internalTopologyBuilder =
> > > > > > > > topology.internalTopologyBuilder;
> > > > > > > >
> > > > > > > >     private final InternalStreamsBuilder
> > internalStreamsBuilder =
> > > > new
> > > > > > > > InternalStreamsBuilder(internalTopologyBuilder);
> > > > > > > > ```
> > > > > > > >
> > > > > > > > The goal is that If `builder#addProcessor` is exposed, we
> could
> > > > > > decorate
> > > > > > > > every `ProcessorSupplier` and capture traces from it:
> > > > > > > >
> > > > > > > > ```
> > > > > > > > @Override
> > > > > > > >   public void addProcessor(String name, ProcessorSupplier
> > > supplier,
> > > > > > > > String... predecessorNames) {
> > > > > > > >     super.addProcessor(name, new TracingProcessorSupplier(
> > > tracer,
> > > > > > name,
> > > > > > > > supplier), predecessorNames);
> > > > > > > >   }
> > > > > > > > ```
> > > > > > > >
> > > > > > > > Would it make sense to propose this as a change:
> > > > > > > >
> > > > >
> > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> > > > > > ?
> > > > > > > or
> > > > > > > > maybe there is a better way to do this?
> > > > > > > > TopologyWrapper does something similar:
> > > > > > > >
> > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > > > test/java/org/apache/kafka/streams/TopologyWrapper.java
> > > > > > > >
> > > > > > > > Thanks in advance for any help.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Jorge.
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Accessing Topology Builder

Posted by Bill Bejeck <bb...@gmail.com>.
Jorge:

I have a crazy idea off the top of my head.

Would something as low-tech using KSteam.peek calls on either side of
certain processors to record start and end times work?

Thanks,
Bill

On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wa...@gmail.com> wrote:

> Jorge:
>
> My suggestion was to let your users to implement on the
> TracingProcessorSupplier
> / TracingProcessor directly instead of the base-line ProcessorSupplier /
> Processor. Would that work for you?
>
>
> Guozhang
>
>
> On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya <
> quilcate.jorge@gmail.com> wrote:
>
> > final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
> > Guozhang and John.
> >
> > @Guozhang:
> >
> > > I'd suggest to provide a
> > > WrapperProcessorSupplier for the users than modifying
> > > InternalStreamsTopology: more specifically, you can provide an
> > > `abstract WrapperProcessorSupplier
> > > implements ProcessorSupplier` and then let users to instantiate this
> > class
> > > instead of the "bare-metal" interface. WDYT?
> >
> > Yes, in the gist, I have a class implementing `ProcessorSupplier`:
> >
> > ```
> > public class TracingProcessorSupplier<K, V> implements
> ProcessorSupplier<K,
> > V> {
> >   final KafkaTracing kafkaTracing;
> >   final String name;
> >   final ProcessorSupplier<K, V> delegate;
> >    public TracingProcessorSupplier(KafkaTracing kafkaTracing,
> >       String name, ProcessorSupplier<K, V> delegate) {
> >     this.kafkaTracing = kafkaTracing;
> >     this.name = name;
> >     this.delegate = delegate;
> >   }
> >    @Override public Processor<K, V> get() {
> >     return new TracingProcessor<>(kafkaTracing, name, delegate.get());
> >   }
> > }
> > ```
> >
> > My challenge is how to wrap Topology Processors created by
> > `StreamsBuilder#build` to make this instrumentation easy to adopt by
> Kafka
> > Streams users.
> >
> > @John:
> >
> > > The diff you posted only contains the library-side changes, and it's
> not
> > > obvious how you would use this to insert the desired tracing code.
> > > Perhaps you could provide a snippet demonstrating how you want to use
> > this
> > > change to enable tracing?
> >
> > My first approach was something like this:
> >
> > ```
> > final StreamsBuilder builder = kafkaStreamsTracing.builder();
> > ```
> >
> > Where `KafkaStreamsTracing#builder` looks like this:
> >
> > ```
> >   public StreamsBuilder builder() {
> >     return new StreamsBuilder(new Topology(new
> > TracingInternalTopologyBuilder(kafkaTracing)));
> >   }
> > ```
> >
> > Then, once the builder creates a topology, `processors` will be wrapped
> by
> > `TracingProcessorSupplier` described above.
> >
> > Probably this approach is too naive but works as an initial proof of
> > concept.
> >
> > > Off the top of my head, here are some other approaches you might
> > evaluate:
> > > * you mentioned interceptors. Perhaps we could create a
> > > ProcessorInterceptor interface and add a config to set it.
> >
> > This sounds very interesting to me. Then we won't need to touch internal
> > API's, and just provide some configs. One challenge here is how to define
> > the hooks. In consumer/producer, lifecycle is clear,
> `onConsumer`/`onSend`
> > and then `onCommit`/`onAck` methods. For Stream processors, how this will
> > look like? Maybe `beforeProcess(context, key, value)` and
> > `afterProcess(context, key, value)`.
> >
> > > * perhaps we could simply build the tracing headers into Streams. Is
> > there
> > > a benefit to making it customizable?
> >
> > I don't understand this option completely. Do you mean something like
> > KIP-159 (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 159%3A+Introducing+Rich+functions+to+Streams
> > )?
> > Headers available on StreamsDSL will allow users to create "custom"
> traces,
> > for instance:
> >
> > ```
> > stream.map( (headers, k, v) -> {
> >   Span span = kafkaTracing.nextSpan(headers).start();
> >   doSomething(k, v);
> >   span.finish();
> > }
> > ```
> >
> > but it won't be possible to instrument the existing processors exposed by
> > DSL only by enabling headers on Streams DSL.
> >
> > If we can define a way to pass a `ProcessorSupplier` to be used by
> > `StreamsBuilder#internalTopology` -not sure if via constructor or some
> > other way- would be enough to support this use-case.
> >
> > > Also, as Matthias said, you would need to create a KIP to propose this
> > > change, but of course we can continue this preliminary discussion until
> > you
> > > feel confident to create the KIP.
> >
> > Happy to do it once the approach is clearer.
> >
> > Cheers,
> > Jorge.
> >
> > El lun., 17 sept. 2018 a las 17:09, John Roesler (<jo...@confluent.io>)
> > escribió:
> >
> > > If I understand the request, it's about tracking the latencies for a
> > > specific record, not the aggregated latencies for each processor.
> > >
> > > Jorge,
> > >
> > > The diff you posted only contains the library-side changes, and it's
> not
> > > obvious how you would use this to insert the desired tracing code.
> > > Perhaps you could provide a snippet demonstrating how you want to use
> > this
> > > change to enable tracing?
> > >
> > > Also, as Matthias said, you would need to create a KIP to propose this
> > > change, but of course we can continue this preliminary discussion until
> > you
> > > feel confident to create the KIP.
> > >
> > > Off the top of my head, here are some other approaches you might
> > evaluate:
> > > * you mentioned interceptors. Perhaps we could create a
> > > ProcessorInterceptor interface and add a config to set it.
> > > * perhaps we could simply build the tracing headers into Streams. Is
> > there
> > > a benefit to making it customizable?
> > >
> > > Thanks for considering this problem!
> > > -John
> > >
> > > On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hello Jorge,
> > > >
> > > > From the TracingProcessor implementation it seems you want to track
> > > > per-processor processing latency, is that right? If this is the case
> > you
> > > > can actually use the per-processor metrics which include latency
> > sensors.
> > > >
> > > > If you do want to track, for a certain record, what's the latency of
> > > > processing it, then you'd probably need the processor implementation
> in
> > > > your repo. In this case, though, I'd suggest to provide a
> > > > WrapperProcessorSupplier for the users than modifying
> > > > InternalStreamsTopology: more specifically, you can provide an
> > > > `abstract WrapperProcessorSupplier
> > > > implements ProcessorSupplier` and then let users to instantiate this
> > > class
> > > > instead of the "bare-metal" interface. WDYT?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
> > > > quilcate.jorge@gmail.com> wrote:
> > > >
> > > > > Thanks for your answer, Matthias!
> > > > >
> > > > > What I'm looking for is something similar to interceptors, but for
> > > Stream
> > > > > Processors.
> > > > >
> > > > > In Zipkin -and probably other tracing implementations as well- we
> are
> > > > using
> > > > > Headers to propagate the context of a trace (i.e. adding metadata
> to
> > > the
> > > > > Kafka Record, so we can create references to a trace).
> > > > > Now that Headers are part of Kafka Streams Processor API, we can
> > > > propagate
> > > > > context from input (Consumers) to outputs (Producers) by using
> > > > > `KafkaClientSupplier` (e.g. <
> > > > > https://github.com/openzipkin/brave/blob/master/
> > > > > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> > > > > TracingKafkaClientSupplier.java
> > > > > >).
> > > > >
> > > > > "Input to Output" traces could be enough for some use-cases, but we
> > are
> > > > > looking for a more detailed trace -that could cover cases like
> > > > side-effects
> > > > > (e.g. for each processor), where input/output and processors
> > latencies
> > > > can
> > > > > be recorded. This is why I have been looking for how to decorate
> the
> > > > > `ProcessorSupplier` and all the changes shown in the comparison.
> Here
> > > is
> > > > a
> > > > > gist of how we are planning to decorate the `addProcessor` method:
> > > > > https://github.com/openzipkin/brave/compare/master...jeqo:
> > > > > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
> > > > >
> > > > > Hope this makes a bit more sense now :)
> > > > >
> > > > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> > > > > matthias@confluent.io>)
> > > > > escribió:
> > > > >
> > > > > > >> I'm experimenting on how to add tracing to Kafka Streams.
> > > > > >
> > > > > > What do you mean by this exactly? Is there a JIRA? I am fine
> > removing
> > > > > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an
> > > > > > internal class.
> > > > > >
> > > > > > However, the diff also shows
> > > > > >
> > > > > > > public Topology(final InternalTopologyBuilder
> > > > internalTopologyBuilder)
> > > > > {
> > > > > >
> > > > > > This has two impacts: first, it modifies `Topology` what is part
> of
> > > > > > public API and would require a KIP. Second, it exposes
> > > > > > `InternalTopologyBuilder` as part of the public API -- something
> we
> > > > > > should not do.
> > > > > >
> > > > > > I am also not sure, why you want to do this (btw: also public API
> > > > change
> > > > > > requiring a KIP). However, this should not be necessary.
> > > > > >
> > > > > > >     public StreamsBuilder(final Topology topology)  {
> > > > > >
> > > > > >
> > > > > > I think I am lacking some context what you try to achieve. Maybe
> > you
> > > > can
> > > > > > elaborate in the problem you try to solve?
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > I'm experimenting on how to add tracing to Kafka Streams.
> > > > > > >
> > > > > > > One option is to override and access
> > > > > > > `InternalTopologyBuilder#addProcessor`. Currently this method
> > it is
> > > > > > final,
> > > > > > > and builder is not exposed as part of `StreamsBuilder`:
> > > > > > >
> > > > > > > ```
> > > > > > > public class StreamsBuilder {
> > > > > > >
> > > > > > >     /** The actual topology that is constructed by this
> > > > StreamsBuilder.
> > > > > > */
> > > > > > >     private final Topology topology = new Topology();
> > > > > > >
> > > > > > >     /** The topology's internal builder. */
> > > > > > >     final InternalTopologyBuilder internalTopologyBuilder =
> > > > > > > topology.internalTopologyBuilder;
> > > > > > >
> > > > > > >     private final InternalStreamsBuilder
> internalStreamsBuilder =
> > > new
> > > > > > > InternalStreamsBuilder(internalTopologyBuilder);
> > > > > > > ```
> > > > > > >
> > > > > > > The goal is that If `builder#addProcessor` is exposed, we could
> > > > > decorate
> > > > > > > every `ProcessorSupplier` and capture traces from it:
> > > > > > >
> > > > > > > ```
> > > > > > > @Override
> > > > > > >   public void addProcessor(String name, ProcessorSupplier
> > supplier,
> > > > > > > String... predecessorNames) {
> > > > > > >     super.addProcessor(name, new TracingProcessorSupplier(
> > tracer,
> > > > > name,
> > > > > > > supplier), predecessorNames);
> > > > > > >   }
> > > > > > > ```
> > > > > > >
> > > > > > > Would it make sense to propose this as a change:
> > > > > > >
> > > >
> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> > > > > ?
> > > > > > or
> > > > > > > maybe there is a better way to do this?
> > > > > > > TopologyWrapper does something similar:
> > > > > > >
> > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > > test/java/org/apache/kafka/streams/TopologyWrapper.java
> > > > > > >
> > > > > > > Thanks in advance for any help.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Jorge.
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Accessing Topology Builder

Posted by Guozhang Wang <wa...@gmail.com>.
Jorge:

My suggestion was to let your users to implement on the
TracingProcessorSupplier
/ TracingProcessor directly instead of the base-line ProcessorSupplier /
Processor. Would that work for you?


Guozhang


On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya <
quilcate.jorge@gmail.com> wrote:

> final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
> Guozhang and John.
>
> @Guozhang:
>
> > I'd suggest to provide a
> > WrapperProcessorSupplier for the users than modifying
> > InternalStreamsTopology: more specifically, you can provide an
> > `abstract WrapperProcessorSupplier
> > implements ProcessorSupplier` and then let users to instantiate this
> class
> > instead of the "bare-metal" interface. WDYT?
>
> Yes, in the gist, I have a class implementing `ProcessorSupplier`:
>
> ```
> public class TracingProcessorSupplier<K, V> implements ProcessorSupplier<K,
> V> {
>   final KafkaTracing kafkaTracing;
>   final String name;
>   final ProcessorSupplier<K, V> delegate;
>    public TracingProcessorSupplier(KafkaTracing kafkaTracing,
>       String name, ProcessorSupplier<K, V> delegate) {
>     this.kafkaTracing = kafkaTracing;
>     this.name = name;
>     this.delegate = delegate;
>   }
>    @Override public Processor<K, V> get() {
>     return new TracingProcessor<>(kafkaTracing, name, delegate.get());
>   }
> }
> ```
>
> My challenge is how to wrap Topology Processors created by
> `StreamsBuilder#build` to make this instrumentation easy to adopt by Kafka
> Streams users.
>
> @John:
>
> > The diff you posted only contains the library-side changes, and it's not
> > obvious how you would use this to insert the desired tracing code.
> > Perhaps you could provide a snippet demonstrating how you want to use
> this
> > change to enable tracing?
>
> My first approach was something like this:
>
> ```
> final StreamsBuilder builder = kafkaStreamsTracing.builder();
> ```
>
> Where `KafkaStreamsTracing#builder` looks like this:
>
> ```
>   public StreamsBuilder builder() {
>     return new StreamsBuilder(new Topology(new
> TracingInternalTopologyBuilder(kafkaTracing)));
>   }
> ```
>
> Then, once the builder creates a topology, `processors` will be wrapped by
> `TracingProcessorSupplier` described above.
>
> Probably this approach is too naive but works as an initial proof of
> concept.
>
> > Off the top of my head, here are some other approaches you might
> evaluate:
> > * you mentioned interceptors. Perhaps we could create a
> > ProcessorInterceptor interface and add a config to set it.
>
> This sounds very interesting to me. Then we won't need to touch internal
> API's, and just provide some configs. One challenge here is how to define
> the hooks. In consumer/producer, lifecycle is clear, `onConsumer`/`onSend`
> and then `onCommit`/`onAck` methods. For Stream processors, how this will
> look like? Maybe `beforeProcess(context, key, value)` and
> `afterProcess(context, key, value)`.
>
> > * perhaps we could simply build the tracing headers into Streams. Is
> there
> > a benefit to making it customizable?
>
> I don't understand this option completely. Do you mean something like
> KIP-159 (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 159%3A+Introducing+Rich+functions+to+Streams
> )?
> Headers available on StreamsDSL will allow users to create "custom" traces,
> for instance:
>
> ```
> stream.map( (headers, k, v) -> {
>   Span span = kafkaTracing.nextSpan(headers).start();
>   doSomething(k, v);
>   span.finish();
> }
> ```
>
> but it won't be possible to instrument the existing processors exposed by
> DSL only by enabling headers on Streams DSL.
>
> If we can define a way to pass a `ProcessorSupplier` to be used by
> `StreamsBuilder#internalTopology` -not sure if via constructor or some
> other way- would be enough to support this use-case.
>
> > Also, as Matthias said, you would need to create a KIP to propose this
> > change, but of course we can continue this preliminary discussion until
> you
> > feel confident to create the KIP.
>
> Happy to do it once the approach is clearer.
>
> Cheers,
> Jorge.
>
> El lun., 17 sept. 2018 a las 17:09, John Roesler (<jo...@confluent.io>)
> escribió:
>
> > If I understand the request, it's about tracking the latencies for a
> > specific record, not the aggregated latencies for each processor.
> >
> > Jorge,
> >
> > The diff you posted only contains the library-side changes, and it's not
> > obvious how you would use this to insert the desired tracing code.
> > Perhaps you could provide a snippet demonstrating how you want to use
> this
> > change to enable tracing?
> >
> > Also, as Matthias said, you would need to create a KIP to propose this
> > change, but of course we can continue this preliminary discussion until
> you
> > feel confident to create the KIP.
> >
> > Off the top of my head, here are some other approaches you might
> evaluate:
> > * you mentioned interceptors. Perhaps we could create a
> > ProcessorInterceptor interface and add a config to set it.
> > * perhaps we could simply build the tracing headers into Streams. Is
> there
> > a benefit to making it customizable?
> >
> > Thanks for considering this problem!
> > -John
> >
> > On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Jorge,
> > >
> > > From the TracingProcessor implementation it seems you want to track
> > > per-processor processing latency, is that right? If this is the case
> you
> > > can actually use the per-processor metrics which include latency
> sensors.
> > >
> > > If you do want to track, for a certain record, what's the latency of
> > > processing it, then you'd probably need the processor implementation in
> > > your repo. In this case, though, I'd suggest to provide a
> > > WrapperProcessorSupplier for the users than modifying
> > > InternalStreamsTopology: more specifically, you can provide an
> > > `abstract WrapperProcessorSupplier
> > > implements ProcessorSupplier` and then let users to instantiate this
> > class
> > > instead of the "bare-metal" interface. WDYT?
> > >
> > >
> > > Guozhang
> > >
> > > On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
> > > quilcate.jorge@gmail.com> wrote:
> > >
> > > > Thanks for your answer, Matthias!
> > > >
> > > > What I'm looking for is something similar to interceptors, but for
> > Stream
> > > > Processors.
> > > >
> > > > In Zipkin -and probably other tracing implementations as well- we are
> > > using
> > > > Headers to propagate the context of a trace (i.e. adding metadata to
> > the
> > > > Kafka Record, so we can create references to a trace).
> > > > Now that Headers are part of Kafka Streams Processor API, we can
> > > propagate
> > > > context from input (Consumers) to outputs (Producers) by using
> > > > `KafkaClientSupplier` (e.g. <
> > > > https://github.com/openzipkin/brave/blob/master/
> > > > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> > > > TracingKafkaClientSupplier.java
> > > > >).
> > > >
> > > > "Input to Output" traces could be enough for some use-cases, but we
> are
> > > > looking for a more detailed trace -that could cover cases like
> > > side-effects
> > > > (e.g. for each processor), where input/output and processors
> latencies
> > > can
> > > > be recorded. This is why I have been looking for how to decorate the
> > > > `ProcessorSupplier` and all the changes shown in the comparison. Here
> > is
> > > a
> > > > gist of how we are planning to decorate the `addProcessor` method:
> > > > https://github.com/openzipkin/brave/compare/master...jeqo:
> > > > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
> > > >
> > > > Hope this makes a bit more sense now :)
> > > >
> > > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> > > > matthias@confluent.io>)
> > > > escribió:
> > > >
> > > > > >> I'm experimenting on how to add tracing to Kafka Streams.
> > > > >
> > > > > What do you mean by this exactly? Is there a JIRA? I am fine
> removing
> > > > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an
> > > > > internal class.
> > > > >
> > > > > However, the diff also shows
> > > > >
> > > > > > public Topology(final InternalTopologyBuilder
> > > internalTopologyBuilder)
> > > > {
> > > > >
> > > > > This has two impacts: first, it modifies `Topology` what is part of
> > > > > public API and would require a KIP. Second, it exposes
> > > > > `InternalTopologyBuilder` as part of the public API -- something we
> > > > > should not do.
> > > > >
> > > > > I am also not sure, why you want to do this (btw: also public API
> > > change
> > > > > requiring a KIP). However, this should not be necessary.
> > > > >
> > > > > >     public StreamsBuilder(final Topology topology)  {
> > > > >
> > > > >
> > > > > I think I am lacking some context what you try to achieve. Maybe
> you
> > > can
> > > > > elaborate in the problem you try to solve?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > > > Hi everyone,
> > > > > >
> > > > > > I'm experimenting on how to add tracing to Kafka Streams.
> > > > > >
> > > > > > One option is to override and access
> > > > > > `InternalTopologyBuilder#addProcessor`. Currently this method
> it is
> > > > > final,
> > > > > > and builder is not exposed as part of `StreamsBuilder`:
> > > > > >
> > > > > > ```
> > > > > > public class StreamsBuilder {
> > > > > >
> > > > > >     /** The actual topology that is constructed by this
> > > StreamsBuilder.
> > > > > */
> > > > > >     private final Topology topology = new Topology();
> > > > > >
> > > > > >     /** The topology's internal builder. */
> > > > > >     final InternalTopologyBuilder internalTopologyBuilder =
> > > > > > topology.internalTopologyBuilder;
> > > > > >
> > > > > >     private final InternalStreamsBuilder internalStreamsBuilder =
> > new
> > > > > > InternalStreamsBuilder(internalTopologyBuilder);
> > > > > > ```
> > > > > >
> > > > > > The goal is that If `builder#addProcessor` is exposed, we could
> > > > decorate
> > > > > > every `ProcessorSupplier` and capture traces from it:
> > > > > >
> > > > > > ```
> > > > > > @Override
> > > > > >   public void addProcessor(String name, ProcessorSupplier
> supplier,
> > > > > > String... predecessorNames) {
> > > > > >     super.addProcessor(name, new TracingProcessorSupplier(
> tracer,
> > > > name,
> > > > > > supplier), predecessorNames);
> > > > > >   }
> > > > > > ```
> > > > > >
> > > > > > Would it make sense to propose this as a change:
> > > > > >
> > > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> > > > ?
> > > > > or
> > > > > > maybe there is a better way to do this?
> > > > > > TopologyWrapper does something similar:
> > > > > >
> > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > test/java/org/apache/kafka/streams/TopologyWrapper.java
> > > > > >
> > > > > > Thanks in advance for any help.
> > > > > >
> > > > > > Cheers,
> > > > > > Jorge.
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
-- Guozhang

Re: Accessing Topology Builder

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
Guozhang and John.

@Guozhang:

> I'd suggest to provide a
> WrapperProcessorSupplier for the users than modifying
> InternalStreamsTopology: more specifically, you can provide an
> `abstract WrapperProcessorSupplier
> implements ProcessorSupplier` and then let users to instantiate this class
> instead of the "bare-metal" interface. WDYT?

Yes, in the gist, I have a class implementing `ProcessorSupplier`:

```
public class TracingProcessorSupplier<K, V> implements ProcessorSupplier<K,
V> {
  final KafkaTracing kafkaTracing;
  final String name;
  final ProcessorSupplier<K, V> delegate;
   public TracingProcessorSupplier(KafkaTracing kafkaTracing,
      String name, ProcessorSupplier<K, V> delegate) {
    this.kafkaTracing = kafkaTracing;
    this.name = name;
    this.delegate = delegate;
  }
   @Override public Processor<K, V> get() {
    return new TracingProcessor<>(kafkaTracing, name, delegate.get());
  }
}
```

My challenge is how to wrap Topology Processors created by
`StreamsBuilder#build` to make this instrumentation easy to adopt by Kafka
Streams users.

@John:

> The diff you posted only contains the library-side changes, and it's not
> obvious how you would use this to insert the desired tracing code.
> Perhaps you could provide a snippet demonstrating how you want to use this
> change to enable tracing?

My first approach was something like this:

```
final StreamsBuilder builder = kafkaStreamsTracing.builder();
```

Where `KafkaStreamsTracing#builder` looks like this:

```
  public StreamsBuilder builder() {
    return new StreamsBuilder(new Topology(new
TracingInternalTopologyBuilder(kafkaTracing)));
  }
```

Then, once the builder creates a topology, `processors` will be wrapped by
`TracingProcessorSupplier` described above.

Probably this approach is too naive but works as an initial proof of
concept.

> Off the top of my head, here are some other approaches you might evaluate:
> * you mentioned interceptors. Perhaps we could create a
> ProcessorInterceptor interface and add a config to set it.

This sounds very interesting to me. Then we won't need to touch internal
API's, and just provide some configs. One challenge here is how to define
the hooks. In consumer/producer, lifecycle is clear, `onConsumer`/`onSend`
and then `onCommit`/`onAck` methods. For Stream processors, how this will
look like? Maybe `beforeProcess(context, key, value)` and
`afterProcess(context, key, value)`.

> * perhaps we could simply build the tracing headers into Streams. Is there
> a benefit to making it customizable?

I don't understand this option completely. Do you mean something like
KIP-159 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams
)?
Headers available on StreamsDSL will allow users to create "custom" traces,
for instance:

```
stream.map( (headers, k, v) -> {
  Span span = kafkaTracing.nextSpan(headers).start();
  doSomething(k, v);
  span.finish();
}
```

but it won't be possible to instrument the existing processors exposed by
DSL only by enabling headers on Streams DSL.

If we can define a way to pass a `ProcessorSupplier` to be used by
`StreamsBuilder#internalTopology` -not sure if via constructor or some
other way- would be enough to support this use-case.

> Also, as Matthias said, you would need to create a KIP to propose this
> change, but of course we can continue this preliminary discussion until
you
> feel confident to create the KIP.

Happy to do it once the approach is clearer.

Cheers,
Jorge.

El lun., 17 sept. 2018 a las 17:09, John Roesler (<jo...@confluent.io>)
escribió:

> If I understand the request, it's about tracking the latencies for a
> specific record, not the aggregated latencies for each processor.
>
> Jorge,
>
> The diff you posted only contains the library-side changes, and it's not
> obvious how you would use this to insert the desired tracing code.
> Perhaps you could provide a snippet demonstrating how you want to use this
> change to enable tracing?
>
> Also, as Matthias said, you would need to create a KIP to propose this
> change, but of course we can continue this preliminary discussion until you
> feel confident to create the KIP.
>
> Off the top of my head, here are some other approaches you might evaluate:
> * you mentioned interceptors. Perhaps we could create a
> ProcessorInterceptor interface and add a config to set it.
> * perhaps we could simply build the tracing headers into Streams. Is there
> a benefit to making it customizable?
>
> Thanks for considering this problem!
> -John
>
> On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Jorge,
> >
> > From the TracingProcessor implementation it seems you want to track
> > per-processor processing latency, is that right? If this is the case you
> > can actually use the per-processor metrics which include latency sensors.
> >
> > If you do want to track, for a certain record, what's the latency of
> > processing it, then you'd probably need the processor implementation in
> > your repo. In this case, though, I'd suggest to provide a
> > WrapperProcessorSupplier for the users than modifying
> > InternalStreamsTopology: more specifically, you can provide an
> > `abstract WrapperProcessorSupplier
> > implements ProcessorSupplier` and then let users to instantiate this
> class
> > instead of the "bare-metal" interface. WDYT?
> >
> >
> > Guozhang
> >
> > On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
> > quilcate.jorge@gmail.com> wrote:
> >
> > > Thanks for your answer, Matthias!
> > >
> > > What I'm looking for is something similar to interceptors, but for
> Stream
> > > Processors.
> > >
> > > In Zipkin -and probably other tracing implementations as well- we are
> > using
> > > Headers to propagate the context of a trace (i.e. adding metadata to
> the
> > > Kafka Record, so we can create references to a trace).
> > > Now that Headers are part of Kafka Streams Processor API, we can
> > propagate
> > > context from input (Consumers) to outputs (Producers) by using
> > > `KafkaClientSupplier` (e.g. <
> > > https://github.com/openzipkin/brave/blob/master/
> > > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> > > TracingKafkaClientSupplier.java
> > > >).
> > >
> > > "Input to Output" traces could be enough for some use-cases, but we are
> > > looking for a more detailed trace -that could cover cases like
> > side-effects
> > > (e.g. for each processor), where input/output and processors latencies
> > can
> > > be recorded. This is why I have been looking for how to decorate the
> > > `ProcessorSupplier` and all the changes shown in the comparison. Here
> is
> > a
> > > gist of how we are planning to decorate the `addProcessor` method:
> > > https://github.com/openzipkin/brave/compare/master...jeqo:
> > > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
> > >
> > > Hope this makes a bit more sense now :)
> > >
> > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> > > matthias@confluent.io>)
> > > escribió:
> > >
> > > > >> I'm experimenting on how to add tracing to Kafka Streams.
> > > >
> > > > What do you mean by this exactly? Is there a JIRA? I am fine removing
> > > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an
> > > > internal class.
> > > >
> > > > However, the diff also shows
> > > >
> > > > > public Topology(final InternalTopologyBuilder
> > internalTopologyBuilder)
> > > {
> > > >
> > > > This has two impacts: first, it modifies `Topology` what is part of
> > > > public API and would require a KIP. Second, it exposes
> > > > `InternalTopologyBuilder` as part of the public API -- something we
> > > > should not do.
> > > >
> > > > I am also not sure, why you want to do this (btw: also public API
> > change
> > > > requiring a KIP). However, this should not be necessary.
> > > >
> > > > >     public StreamsBuilder(final Topology topology)  {
> > > >
> > > >
> > > > I think I am lacking some context what you try to achieve. Maybe you
> > can
> > > > elaborate in the problem you try to solve?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > > Hi everyone,
> > > > >
> > > > > I'm experimenting on how to add tracing to Kafka Streams.
> > > > >
> > > > > One option is to override and access
> > > > > `InternalTopologyBuilder#addProcessor`. Currently this method it is
> > > > final,
> > > > > and builder is not exposed as part of `StreamsBuilder`:
> > > > >
> > > > > ```
> > > > > public class StreamsBuilder {
> > > > >
> > > > >     /** The actual topology that is constructed by this
> > StreamsBuilder.
> > > > */
> > > > >     private final Topology topology = new Topology();
> > > > >
> > > > >     /** The topology's internal builder. */
> > > > >     final InternalTopologyBuilder internalTopologyBuilder =
> > > > > topology.internalTopologyBuilder;
> > > > >
> > > > >     private final InternalStreamsBuilder internalStreamsBuilder =
> new
> > > > > InternalStreamsBuilder(internalTopologyBuilder);
> > > > > ```
> > > > >
> > > > > The goal is that If `builder#addProcessor` is exposed, we could
> > > decorate
> > > > > every `ProcessorSupplier` and capture traces from it:
> > > > >
> > > > > ```
> > > > > @Override
> > > > >   public void addProcessor(String name, ProcessorSupplier supplier,
> > > > > String... predecessorNames) {
> > > > >     super.addProcessor(name, new TracingProcessorSupplier(tracer,
> > > name,
> > > > > supplier), predecessorNames);
> > > > >   }
> > > > > ```
> > > > >
> > > > > Would it make sense to propose this as a change:
> > > > >
> > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> > > ?
> > > > or
> > > > > maybe there is a better way to do this?
> > > > > TopologyWrapper does something similar:
> > > > >
> > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > test/java/org/apache/kafka/streams/TopologyWrapper.java
> > > > >
> > > > > Thanks in advance for any help.
> > > > >
> > > > > Cheers,
> > > > > Jorge.
> > > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Accessing Topology Builder

Posted by John Roesler <jo...@confluent.io>.
If I understand the request, it's about tracking the latencies for a
specific record, not the aggregated latencies for each processor.

Jorge,

The diff you posted only contains the library-side changes, and it's not
obvious how you would use this to insert the desired tracing code.
Perhaps you could provide a snippet demonstrating how you want to use this
change to enable tracing?

Also, as Matthias said, you would need to create a KIP to propose this
change, but of course we can continue this preliminary discussion until you
feel confident to create the KIP.

Off the top of my head, here are some other approaches you might evaluate:
* you mentioned interceptors. Perhaps we could create a
ProcessorInterceptor interface and add a config to set it.
* perhaps we could simply build the tracing headers into Streams. Is there
a benefit to making it customizable?

Thanks for considering this problem!
-John

On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Jorge,
>
> From the TracingProcessor implementation it seems you want to track
> per-processor processing latency, is that right? If this is the case you
> can actually use the per-processor metrics which include latency sensors.
>
> If you do want to track, for a certain record, what's the latency of
> processing it, then you'd probably need the processor implementation in
> your repo. In this case, though, I'd suggest to provide a
> WrapperProcessorSupplier for the users than modifying
> InternalStreamsTopology: more specifically, you can provide an
> `abstract WrapperProcessorSupplier
> implements ProcessorSupplier` and then let users to instantiate this class
> instead of the "bare-metal" interface. WDYT?
>
>
> Guozhang
>
> On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
> quilcate.jorge@gmail.com> wrote:
>
> > Thanks for your answer, Matthias!
> >
> > What I'm looking for is something similar to interceptors, but for Stream
> > Processors.
> >
> > In Zipkin -and probably other tracing implementations as well- we are
> using
> > Headers to propagate the context of a trace (i.e. adding metadata to the
> > Kafka Record, so we can create references to a trace).
> > Now that Headers are part of Kafka Streams Processor API, we can
> propagate
> > context from input (Consumers) to outputs (Producers) by using
> > `KafkaClientSupplier` (e.g. <
> > https://github.com/openzipkin/brave/blob/master/
> > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> > TracingKafkaClientSupplier.java
> > >).
> >
> > "Input to Output" traces could be enough for some use-cases, but we are
> > looking for a more detailed trace -that could cover cases like
> side-effects
> > (e.g. for each processor), where input/output and processors latencies
> can
> > be recorded. This is why I have been looking for how to decorate the
> > `ProcessorSupplier` and all the changes shown in the comparison. Here is
> a
> > gist of how we are planning to decorate the `addProcessor` method:
> > https://github.com/openzipkin/brave/compare/master...jeqo:
> > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
> >
> > Hope this makes a bit more sense now :)
> >
> > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> > matthias@confluent.io>)
> > escribió:
> >
> > > >> I'm experimenting on how to add tracing to Kafka Streams.
> > >
> > > What do you mean by this exactly? Is there a JIRA? I am fine removing
> > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an
> > > internal class.
> > >
> > > However, the diff also shows
> > >
> > > > public Topology(final InternalTopologyBuilder
> internalTopologyBuilder)
> > {
> > >
> > > This has two impacts: first, it modifies `Topology` what is part of
> > > public API and would require a KIP. Second, it exposes
> > > `InternalTopologyBuilder` as part of the public API -- something we
> > > should not do.
> > >
> > > I am also not sure, why you want to do this (btw: also public API
> change
> > > requiring a KIP). However, this should not be necessary.
> > >
> > > >     public StreamsBuilder(final Topology topology)  {
> > >
> > >
> > > I think I am lacking some context what you try to achieve. Maybe you
> can
> > > elaborate in the problem you try to solve?
> > >
> > >
> > > -Matthias
> > >
> > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > Hi everyone,
> > > >
> > > > I'm experimenting on how to add tracing to Kafka Streams.
> > > >
> > > > One option is to override and access
> > > > `InternalTopologyBuilder#addProcessor`. Currently this method it is
> > > final,
> > > > and builder is not exposed as part of `StreamsBuilder`:
> > > >
> > > > ```
> > > > public class StreamsBuilder {
> > > >
> > > >     /** The actual topology that is constructed by this
> StreamsBuilder.
> > > */
> > > >     private final Topology topology = new Topology();
> > > >
> > > >     /** The topology's internal builder. */
> > > >     final InternalTopologyBuilder internalTopologyBuilder =
> > > > topology.internalTopologyBuilder;
> > > >
> > > >     private final InternalStreamsBuilder internalStreamsBuilder = new
> > > > InternalStreamsBuilder(internalTopologyBuilder);
> > > > ```
> > > >
> > > > The goal is that If `builder#addProcessor` is exposed, we could
> > decorate
> > > > every `ProcessorSupplier` and capture traces from it:
> > > >
> > > > ```
> > > > @Override
> > > >   public void addProcessor(String name, ProcessorSupplier supplier,
> > > > String... predecessorNames) {
> > > >     super.addProcessor(name, new TracingProcessorSupplier(tracer,
> > name,
> > > > supplier), predecessorNames);
> > > >   }
> > > > ```
> > > >
> > > > Would it make sense to propose this as a change:
> > > >
> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> > ?
> > > or
> > > > maybe there is a better way to do this?
> > > > TopologyWrapper does something similar:
> > > >
> > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > test/java/org/apache/kafka/streams/TopologyWrapper.java
> > > >
> > > > Thanks in advance for any help.
> > > >
> > > > Cheers,
> > > > Jorge.
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Accessing Topology Builder

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Jorge,

From the TracingProcessor implementation it seems you want to track
per-processor processing latency, is that right? If this is the case you
can actually use the per-processor metrics which include latency sensors.

If you do want to track, for a certain record, what's the latency of
processing it, then you'd probably need the processor implementation in
your repo. In this case, though, I'd suggest to provide a
WrapperProcessorSupplier for the users than modifying
InternalStreamsTopology: more specifically, you can provide an
`abstract WrapperProcessorSupplier
implements ProcessorSupplier` and then let users to instantiate this class
instead of the "bare-metal" interface. WDYT?


Guozhang

On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
quilcate.jorge@gmail.com> wrote:

> Thanks for your answer, Matthias!
>
> What I'm looking for is something similar to interceptors, but for Stream
> Processors.
>
> In Zipkin -and probably other tracing implementations as well- we are using
> Headers to propagate the context of a trace (i.e. adding metadata to the
> Kafka Record, so we can create references to a trace).
> Now that Headers are part of Kafka Streams Processor API, we can propagate
> context from input (Consumers) to outputs (Producers) by using
> `KafkaClientSupplier` (e.g. <
> https://github.com/openzipkin/brave/blob/master/
> instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> TracingKafkaClientSupplier.java
> >).
>
> "Input to Output" traces could be enough for some use-cases, but we are
> looking for a more detailed trace -that could cover cases like side-effects
> (e.g. for each processor), where input/output and processors latencies can
> be recorded. This is why I have been looking for how to decorate the
> `ProcessorSupplier` and all the changes shown in the comparison. Here is a
> gist of how we are planning to decorate the `addProcessor` method:
> https://github.com/openzipkin/brave/compare/master...jeqo:
> kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
>
> Hope this makes a bit more sense now :)
>
> El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> matthias@confluent.io>)
> escribió:
>
> > >> I'm experimenting on how to add tracing to Kafka Streams.
> >
> > What do you mean by this exactly? Is there a JIRA? I am fine removing
> > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an
> > internal class.
> >
> > However, the diff also shows
> >
> > > public Topology(final InternalTopologyBuilder internalTopologyBuilder)
> {
> >
> > This has two impacts: first, it modifies `Topology` what is part of
> > public API and would require a KIP. Second, it exposes
> > `InternalTopologyBuilder` as part of the public API -- something we
> > should not do.
> >
> > I am also not sure, why you want to do this (btw: also public API change
> > requiring a KIP). However, this should not be necessary.
> >
> > >     public StreamsBuilder(final Topology topology)  {
> >
> >
> > I think I am lacking some context what you try to achieve. Maybe you can
> > elaborate in the problem you try to solve?
> >
> >
> > -Matthias
> >
> > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > > Hi everyone,
> > >
> > > I'm experimenting on how to add tracing to Kafka Streams.
> > >
> > > One option is to override and access
> > > `InternalTopologyBuilder#addProcessor`. Currently this method it is
> > final,
> > > and builder is not exposed as part of `StreamsBuilder`:
> > >
> > > ```
> > > public class StreamsBuilder {
> > >
> > >     /** The actual topology that is constructed by this StreamsBuilder.
> > */
> > >     private final Topology topology = new Topology();
> > >
> > >     /** The topology's internal builder. */
> > >     final InternalTopologyBuilder internalTopologyBuilder =
> > > topology.internalTopologyBuilder;
> > >
> > >     private final InternalStreamsBuilder internalStreamsBuilder = new
> > > InternalStreamsBuilder(internalTopologyBuilder);
> > > ```
> > >
> > > The goal is that If `builder#addProcessor` is exposed, we could
> decorate
> > > every `ProcessorSupplier` and capture traces from it:
> > >
> > > ```
> > > @Override
> > >   public void addProcessor(String name, ProcessorSupplier supplier,
> > > String... predecessorNames) {
> > >     super.addProcessor(name, new TracingProcessorSupplier(tracer,
> name,
> > > supplier), predecessorNames);
> > >   }
> > > ```
> > >
> > > Would it make sense to propose this as a change:
> > > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> ?
> > or
> > > maybe there is a better way to do this?
> > > TopologyWrapper does something similar:
> > >
> > https://github.com/apache/kafka/blob/trunk/streams/src/
> test/java/org/apache/kafka/streams/TopologyWrapper.java
> > >
> > > Thanks in advance for any help.
> > >
> > > Cheers,
> > > Jorge.
> > >
> >
> >
>



-- 
-- Guozhang

Re: Accessing Topology Builder

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Thanks for your answer, Matthias!

What I'm looking for is something similar to interceptors, but for Stream
Processors.

In Zipkin -and probably other tracing implementations as well- we are using
Headers to propagate the context of a trace (i.e. adding metadata to the
Kafka Record, so we can create references to a trace).
Now that Headers are part of Kafka Streams Processor API, we can propagate
context from input (Consumers) to outputs (Producers) by using
`KafkaClientSupplier` (e.g. <
https://github.com/openzipkin/brave/blob/master/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingKafkaClientSupplier.java
>).

"Input to Output" traces could be enough for some use-cases, but we are
looking for a more detailed trace -that could cover cases like side-effects
(e.g. for each processor), where input/output and processors latencies can
be recorded. This is why I have been looking for how to decorate the
`ProcessorSupplier` and all the changes shown in the comparison. Here is a
gist of how we are planning to decorate the `addProcessor` method:
https://github.com/openzipkin/brave/compare/master...jeqo:kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7

Hope this makes a bit more sense now :)

El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<ma...@confluent.io>)
escribió:

> >> I'm experimenting on how to add tracing to Kafka Streams.
>
> What do you mean by this exactly? Is there a JIRA? I am fine removing
> `final` from `InternalTopologyBuilder#addProcessor()` -- it's an
> internal class.
>
> However, the diff also shows
>
> > public Topology(final InternalTopologyBuilder internalTopologyBuilder) {
>
> This has two impacts: first, it modifies `Topology` what is part of
> public API and would require a KIP. Second, it exposes
> `InternalTopologyBuilder` as part of the public API -- something we
> should not do.
>
> I am also not sure, why you want to do this (btw: also public API change
> requiring a KIP). However, this should not be necessary.
>
> >     public StreamsBuilder(final Topology topology)  {
>
>
> I think I am lacking some context what you try to achieve. Maybe you can
> elaborate in the problem you try to solve?
>
>
> -Matthias
>
> On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > Hi everyone,
> >
> > I'm experimenting on how to add tracing to Kafka Streams.
> >
> > One option is to override and access
> > `InternalTopologyBuilder#addProcessor`. Currently this method it is
> final,
> > and builder is not exposed as part of `StreamsBuilder`:
> >
> > ```
> > public class StreamsBuilder {
> >
> >     /** The actual topology that is constructed by this StreamsBuilder.
> */
> >     private final Topology topology = new Topology();
> >
> >     /** The topology's internal builder. */
> >     final InternalTopologyBuilder internalTopologyBuilder =
> > topology.internalTopologyBuilder;
> >
> >     private final InternalStreamsBuilder internalStreamsBuilder = new
> > InternalStreamsBuilder(internalTopologyBuilder);
> > ```
> >
> > The goal is that If `builder#addProcessor` is exposed, we could decorate
> > every `ProcessorSupplier` and capture traces from it:
> >
> > ```
> > @Override
> >   public void addProcessor(String name, ProcessorSupplier supplier,
> > String... predecessorNames) {
> >     super.addProcessor(name, new TracingProcessorSupplier(tracer, name,
> > supplier), predecessorNames);
> >   }
> > ```
> >
> > Would it make sense to propose this as a change:
> > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology ?
> or
> > maybe there is a better way to do this?
> > TopologyWrapper does something similar:
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
> >
> > Thanks in advance for any help.
> >
> > Cheers,
> > Jorge.
> >
>
>

Re: Accessing Topology Builder

Posted by "Matthias J. Sax" <ma...@confluent.io>.
>> I'm experimenting on how to add tracing to Kafka Streams.

What do you mean by this exactly? Is there a JIRA? I am fine removing
`final` from `InternalTopologyBuilder#addProcessor()` -- it's an
internal class.

However, the diff also shows

> public Topology(final InternalTopologyBuilder internalTopologyBuilder) {

This has two impacts: first, it modifies `Topology` what is part of
public API and would require a KIP. Second, it exposes
`InternalTopologyBuilder` as part of the public API -- something we
should not do.

I am also not sure, why you want to do this (btw: also public API change
requiring a KIP). However, this should not be necessary.

>     public StreamsBuilder(final Topology topology)  {


I think I am lacking some context what you try to achieve. Maybe you can
elaborate in the problem you try to solve?


-Matthias

On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> Hi everyone,
> 
> I'm experimenting on how to add tracing to Kafka Streams.
> 
> One option is to override and access
> `InternalTopologyBuilder#addProcessor`. Currently this method it is final,
> and builder is not exposed as part of `StreamsBuilder`:
> 
> ```
> public class StreamsBuilder {
> 
>     /** The actual topology that is constructed by this StreamsBuilder. */
>     private final Topology topology = new Topology();
> 
>     /** The topology's internal builder. */
>     final InternalTopologyBuilder internalTopologyBuilder =
> topology.internalTopologyBuilder;
> 
>     private final InternalStreamsBuilder internalStreamsBuilder = new
> InternalStreamsBuilder(internalTopologyBuilder);
> ```
> 
> The goal is that If `builder#addProcessor` is exposed, we could decorate
> every `ProcessorSupplier` and capture traces from it:
> 
> ```
> @Override
>   public void addProcessor(String name, ProcessorSupplier supplier,
> String... predecessorNames) {
>     super.addProcessor(name, new TracingProcessorSupplier(tracer, name,
> supplier), predecessorNames);
>   }
> ```
> 
> Would it make sense to propose this as a change:
> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology ? or
> maybe there is a better way to do this?
> TopologyWrapper does something similar:
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
> 
> Thanks in advance for any help.
> 
> Cheers,
> Jorge.
>