You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by josh gruenberg <jo...@gmail.com> on 2016/03/22 19:51:50 UTC

Sessionizing inputs with Kafka Streams

Hello there,

I've been experimenting with the Kafka Streams preview, and I'm excited
about its features and capabilities! My team is enthusiastic about the
lightweight operational profile, and the support for local state is very
compelling.

However, I'm having trouble designing a solution with KStreams to satisfy a
particular use-case: we want to "Sessionize" a stream of events, by
gathering together inputs that share a common identifier and occur without
a configurable interruption (gap) in event-time.

This is achievable with other streaming frameworks (eg, using
Beam/Dataflow's "Session" windows, or SparkStreaming's mapWithState with
its "timeout" capability), but I don't see how to approach it with the
current Kafka Streams API.

I've investigated using the aggregateWithKey function, but this doesn't
appear to support data-driven windowing. I've also considered using a
custom Processor to perform the aggregation, but don't see how to take an
output-stream from a Processor and continue to work with it. This area of
the system is undocumented, so I'm not sure how to proceed.

Am I missing something? Do you have any suggestions?

-josh

Re: Sessionizing inputs with Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Josh,

If you have some ideas about improving the Transformer / Processor APIs as
well as supporting sessioned windows. Please do feel free to create a JIRA
and start discussions there. Also, PRs are more than welcomed :)

Guozhang

On Fri, Mar 25, 2016 at 10:50 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Josh,
>
> We are aware that the Transformer / Processor can be improved, for example
> the punctuate() function should be able to return the same typed value R
> for Transformer.
>
> As for now, in your case you can return a sentinel from transform, and add
> a "filter" right after it removing sentinel values.
>
> Guozhang
>
>
> On Wed, Mar 23, 2016 at 7:02 PM, josh gruenberg <jo...@gmail.com> wrote:
>
>> Thank you, Guozhang! In my exploration, I did overlook the "transform"
>> method; this looks promising.
>>
>> I could still use a little more help: I'm confused because for this
>> sessionization use-case, an invocation of the 'transform' method usually
>> suggests that a session is still active, so I'll have nothing to emit from
>> 'transform'. Instead, I'm guessing I'll need to produce my results from
>> the
>> 'punctuate' callback. So my questions are:
>>
>> 1. what should I return from 'transform' to indicate that I have no output
>> at this time? From my reading of 'KStreamTransformProcessor.process', it
>> appears that "null" won't fly. Should I return a dummy KeyValue, and then
>> filter that out downstream? Seems a little cumbersome, but perhaps not
>> terrible as an interim solution... Is there a better way?
>> 2. To emit completed aggregations in response to 'punctuate', can I just
>> send them via 'context.forward'? (I'll note that this doesn't appear to
>> enforce any type-safety, which could lead to maintainability issues.)
>>
>> Finally, I'll add that this pattern feels like it's abusing the
>> Transformer
>> SPI. The interface assumes that transformation is always 1:1, which is
>> artificially limiting. I imagine some sort of generalization of this part
>> of the system could improve usability. For example, both 'transform' and
>> 'punctuate' might be reframed as void methods that receive a type-safe
>> interface for 'context.forward'. (I have this small change drafted up
>> within the kafka trunk sources, and could submit a PR if the maintainers
>> are interested?)
>>
>> Thanks,
>> -josh
>>
>> On Wed, Mar 23, 2016 at 11:02 AM Guozhang Wang <wa...@gmail.com>
>> wrote:
>>
>> > Hello Josh,
>> >
>> > As of now Kafka Streams does not yet support session windows as in the
>> > Dataflow model, though we do have near term plans to support it.
>> >
>> > As for now you can still work around it with the Processor, by calling
>> > "KStream.transform()" function, which can still return you a stream
>> object.
>> > In your customized "Transofmer" implementation, you can attach a state
>> > store of your own and access it in the "transform" function, and only
>> > return the results, for example, when one session has ended.
>> >
>> > As a concrete example, Confluent has some internal tools that uses Kafka
>> > Streams already for some online operations, where a sessioned window
>> > processor are needed as well. We use the "transform" function in the
>> > Streams DSL (i.e. "KStreamBuilder") in the following sketch:
>> >
>> > --------------
>> >
>> > builder.addStateStore(/* new RocksDBKeyValueStoreSupplier(..)*/,
>> > "store-name");
>> >
>> > stream1 = builder.stream("source-topic");
>> >
>> > stream2.transform(MyTransformerFunc, "store-name");
>> >
>> > --------------
>> >
>> > then in MyTransformerFunc:
>> >
>> > public void init(ProcessorContext context) {
>> >           this.kvStore = context.getStateStore("store-name");
>> >
>> >
>> >            // now you can access this store in the transform function.
>> > }
>> >
>> > --------------
>> >
>> >
>> > Hope this helps.
>> >
>> > Guozhang
>> >
>> > On Tue, Mar 22, 2016 at 11:51 AM, josh gruenberg <jo...@gmail.com>
>> wrote:
>> >
>> > > Hello there,
>> > >
>> > > I've been experimenting with the Kafka Streams preview, and I'm
>> excited
>> > > about its features and capabilities! My team is enthusiastic about the
>> > > lightweight operational profile, and the support for local state is
>> very
>> > > compelling.
>> > >
>> > > However, I'm having trouble designing a solution with KStreams to
>> > satisfy a
>> > > particular use-case: we want to "Sessionize" a stream of events, by
>> > > gathering together inputs that share a common identifier and occur
>> > without
>> > > a configurable interruption (gap) in event-time.
>> > >
>> > > This is achievable with other streaming frameworks (eg, using
>> > > Beam/Dataflow's "Session" windows, or SparkStreaming's mapWithState
>> with
>> > > its "timeout" capability), but I don't see how to approach it with the
>> > > current Kafka Streams API.
>> > >
>> > > I've investigated using the aggregateWithKey function, but this
>> doesn't
>> > > appear to support data-driven windowing. I've also considered using a
>> > > custom Processor to perform the aggregation, but don't see how to
>> take an
>> > > output-stream from a Processor and continue to work with it. This
>> area of
>> > > the system is undocumented, so I'm not sure how to proceed.
>> > >
>> > > Am I missing something? Do you have any suggestions?
>> > >
>> > > -josh
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: Sessionizing inputs with Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Josh,

We are aware that the Transformer / Processor can be improved, for example
the punctuate() function should be able to return the same typed value R
for Transformer.

As for now, in your case you can return a sentinel from transform, and add
a "filter" right after it removing sentinel values.

Guozhang


On Wed, Mar 23, 2016 at 7:02 PM, josh gruenberg <jo...@gmail.com> wrote:

> Thank you, Guozhang! In my exploration, I did overlook the "transform"
> method; this looks promising.
>
> I could still use a little more help: I'm confused because for this
> sessionization use-case, an invocation of the 'transform' method usually
> suggests that a session is still active, so I'll have nothing to emit from
> 'transform'. Instead, I'm guessing I'll need to produce my results from the
> 'punctuate' callback. So my questions are:
>
> 1. what should I return from 'transform' to indicate that I have no output
> at this time? From my reading of 'KStreamTransformProcessor.process', it
> appears that "null" won't fly. Should I return a dummy KeyValue, and then
> filter that out downstream? Seems a little cumbersome, but perhaps not
> terrible as an interim solution... Is there a better way?
> 2. To emit completed aggregations in response to 'punctuate', can I just
> send them via 'context.forward'? (I'll note that this doesn't appear to
> enforce any type-safety, which could lead to maintainability issues.)
>
> Finally, I'll add that this pattern feels like it's abusing the Transformer
> SPI. The interface assumes that transformation is always 1:1, which is
> artificially limiting. I imagine some sort of generalization of this part
> of the system could improve usability. For example, both 'transform' and
> 'punctuate' might be reframed as void methods that receive a type-safe
> interface for 'context.forward'. (I have this small change drafted up
> within the kafka trunk sources, and could submit a PR if the maintainers
> are interested?)
>
> Thanks,
> -josh
>
> On Wed, Mar 23, 2016 at 11:02 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Josh,
> >
> > As of now Kafka Streams does not yet support session windows as in the
> > Dataflow model, though we do have near term plans to support it.
> >
> > As for now you can still work around it with the Processor, by calling
> > "KStream.transform()" function, which can still return you a stream
> object.
> > In your customized "Transofmer" implementation, you can attach a state
> > store of your own and access it in the "transform" function, and only
> > return the results, for example, when one session has ended.
> >
> > As a concrete example, Confluent has some internal tools that uses Kafka
> > Streams already for some online operations, where a sessioned window
> > processor are needed as well. We use the "transform" function in the
> > Streams DSL (i.e. "KStreamBuilder") in the following sketch:
> >
> > --------------
> >
> > builder.addStateStore(/* new RocksDBKeyValueStoreSupplier(..)*/,
> > "store-name");
> >
> > stream1 = builder.stream("source-topic");
> >
> > stream2.transform(MyTransformerFunc, "store-name");
> >
> > --------------
> >
> > then in MyTransformerFunc:
> >
> > public void init(ProcessorContext context) {
> >           this.kvStore = context.getStateStore("store-name");
> >
> >
> >            // now you can access this store in the transform function.
> > }
> >
> > --------------
> >
> >
> > Hope this helps.
> >
> > Guozhang
> >
> > On Tue, Mar 22, 2016 at 11:51 AM, josh gruenberg <jo...@gmail.com>
> wrote:
> >
> > > Hello there,
> > >
> > > I've been experimenting with the Kafka Streams preview, and I'm excited
> > > about its features and capabilities! My team is enthusiastic about the
> > > lightweight operational profile, and the support for local state is
> very
> > > compelling.
> > >
> > > However, I'm having trouble designing a solution with KStreams to
> > satisfy a
> > > particular use-case: we want to "Sessionize" a stream of events, by
> > > gathering together inputs that share a common identifier and occur
> > without
> > > a configurable interruption (gap) in event-time.
> > >
> > > This is achievable with other streaming frameworks (eg, using
> > > Beam/Dataflow's "Session" windows, or SparkStreaming's mapWithState
> with
> > > its "timeout" capability), but I don't see how to approach it with the
> > > current Kafka Streams API.
> > >
> > > I've investigated using the aggregateWithKey function, but this doesn't
> > > appear to support data-driven windowing. I've also considered using a
> > > custom Processor to perform the aggregation, but don't see how to take
> an
> > > output-stream from a Processor and continue to work with it. This area
> of
> > > the system is undocumented, so I'm not sure how to proceed.
> > >
> > > Am I missing something? Do you have any suggestions?
> > >
> > > -josh
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Sessionizing inputs with Kafka Streams

Posted by josh gruenberg <jo...@gmail.com>.
Thank you, Guozhang! In my exploration, I did overlook the "transform"
method; this looks promising.

I could still use a little more help: I'm confused because for this
sessionization use-case, an invocation of the 'transform' method usually
suggests that a session is still active, so I'll have nothing to emit from
'transform'. Instead, I'm guessing I'll need to produce my results from the
'punctuate' callback. So my questions are:

1. what should I return from 'transform' to indicate that I have no output
at this time? From my reading of 'KStreamTransformProcessor.process', it
appears that "null" won't fly. Should I return a dummy KeyValue, and then
filter that out downstream? Seems a little cumbersome, but perhaps not
terrible as an interim solution... Is there a better way?
2. To emit completed aggregations in response to 'punctuate', can I just
send them via 'context.forward'? (I'll note that this doesn't appear to
enforce any type-safety, which could lead to maintainability issues.)

Finally, I'll add that this pattern feels like it's abusing the Transformer
SPI. The interface assumes that transformation is always 1:1, which is
artificially limiting. I imagine some sort of generalization of this part
of the system could improve usability. For example, both 'transform' and
'punctuate' might be reframed as void methods that receive a type-safe
interface for 'context.forward'. (I have this small change drafted up
within the kafka trunk sources, and could submit a PR if the maintainers
are interested?)

Thanks,
-josh

On Wed, Mar 23, 2016 at 11:02 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Josh,
>
> As of now Kafka Streams does not yet support session windows as in the
> Dataflow model, though we do have near term plans to support it.
>
> As for now you can still work around it with the Processor, by calling
> "KStream.transform()" function, which can still return you a stream object.
> In your customized "Transofmer" implementation, you can attach a state
> store of your own and access it in the "transform" function, and only
> return the results, for example, when one session has ended.
>
> As a concrete example, Confluent has some internal tools that uses Kafka
> Streams already for some online operations, where a sessioned window
> processor are needed as well. We use the "transform" function in the
> Streams DSL (i.e. "KStreamBuilder") in the following sketch:
>
> --------------
>
> builder.addStateStore(/* new RocksDBKeyValueStoreSupplier(..)*/,
> "store-name");
>
> stream1 = builder.stream("source-topic");
>
> stream2.transform(MyTransformerFunc, "store-name");
>
> --------------
>
> then in MyTransformerFunc:
>
> public void init(ProcessorContext context) {
>           this.kvStore = context.getStateStore("store-name");
>
>
>            // now you can access this store in the transform function.
> }
>
> --------------
>
>
> Hope this helps.
>
> Guozhang
>
> On Tue, Mar 22, 2016 at 11:51 AM, josh gruenberg <jo...@gmail.com> wrote:
>
> > Hello there,
> >
> > I've been experimenting with the Kafka Streams preview, and I'm excited
> > about its features and capabilities! My team is enthusiastic about the
> > lightweight operational profile, and the support for local state is very
> > compelling.
> >
> > However, I'm having trouble designing a solution with KStreams to
> satisfy a
> > particular use-case: we want to "Sessionize" a stream of events, by
> > gathering together inputs that share a common identifier and occur
> without
> > a configurable interruption (gap) in event-time.
> >
> > This is achievable with other streaming frameworks (eg, using
> > Beam/Dataflow's "Session" windows, or SparkStreaming's mapWithState with
> > its "timeout" capability), but I don't see how to approach it with the
> > current Kafka Streams API.
> >
> > I've investigated using the aggregateWithKey function, but this doesn't
> > appear to support data-driven windowing. I've also considered using a
> > custom Processor to perform the aggregation, but don't see how to take an
> > output-stream from a Processor and continue to work with it. This area of
> > the system is undocumented, so I'm not sure how to proceed.
> >
> > Am I missing something? Do you have any suggestions?
> >
> > -josh
> >
>
>
>
> --
> -- Guozhang
>

Re: Sessionizing inputs with Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Josh,

As of now Kafka Streams does not yet support session windows as in the
Dataflow model, though we do have near term plans to support it.

As for now you can still work around it with the Processor, by calling
"KStream.transform()" function, which can still return you a stream object.
In your customized "Transofmer" implementation, you can attach a state
store of your own and access it in the "transform" function, and only
return the results, for example, when one session has ended.

As a concrete example, Confluent has some internal tools that uses Kafka
Streams already for some online operations, where a sessioned window
processor are needed as well. We use the "transform" function in the
Streams DSL (i.e. "KStreamBuilder") in the following sketch:

--------------

builder.addStateStore(/* new RocksDBKeyValueStoreSupplier(..)*/,
"store-name");

stream1 = builder.stream("source-topic");

stream2.transform(MyTransformerFunc, "store-name");

--------------

then in MyTransformerFunc:

public void init(ProcessorContext context) {
          this.kvStore = context.getStateStore("store-name");


           // now you can access this store in the transform function.
}

--------------


Hope this helps.

Guozhang

On Tue, Mar 22, 2016 at 11:51 AM, josh gruenberg <jo...@gmail.com> wrote:

> Hello there,
>
> I've been experimenting with the Kafka Streams preview, and I'm excited
> about its features and capabilities! My team is enthusiastic about the
> lightweight operational profile, and the support for local state is very
> compelling.
>
> However, I'm having trouble designing a solution with KStreams to satisfy a
> particular use-case: we want to "Sessionize" a stream of events, by
> gathering together inputs that share a common identifier and occur without
> a configurable interruption (gap) in event-time.
>
> This is achievable with other streaming frameworks (eg, using
> Beam/Dataflow's "Session" windows, or SparkStreaming's mapWithState with
> its "timeout" capability), but I don't see how to approach it with the
> current Kafka Streams API.
>
> I've investigated using the aggregateWithKey function, but this doesn't
> appear to support data-driven windowing. I've also considered using a
> custom Processor to perform the aggregation, but don't see how to take an
> output-stream from a Processor and continue to work with it. This area of
> the system is undocumented, so I'm not sure how to proceed.
>
> Am I missing something? Do you have any suggestions?
>
> -josh
>



-- 
-- Guozhang