You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kyle Winkelman <wi...@gmail.com> on 2020/09/24 15:23:45 UTC

Kafka Streams Runner [BEAM-2466]

Hello everyone,



Jira:

https://issues.apache.org/jira/browse/BEAM-2466



My Branch:

https://github.com/kyle-winkelman/beam/tree/kafka-streams



I have taken an initial pass at creating a Kafka Streams Runner. It passes
nearly all of the @ValidatesRunner tests. I am hoping to find someone that
has experience writing a Runner to take a look at it and give me some
feedback before I open a PR.



I am using the Kafka Streams DSL
<https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html>,
so a PCollection is equivalent to a KStream
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html>
.



ParDo:

  - implements Transformer
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/Transformer.html>
 to KStream#transform
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-org.apache.kafka.streams.kstream.Named-java.lang.String...->

  - outputs KStream<TupleTag<?>, WindowedValue<?>> then uses a
KStream#branch
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#branch-org.apache.kafka.streams.kstream.Predicate...->
to
handle Pardo.MultiOutput

  - schedules a Punctuator
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/processor/Punctuator.html>
to
periodically start/finish bundles



GroupByKey:

  - KStream#repartition
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#repartition-->
the
data to groupOnlyByKey

  - groupAlsoByWindow similarly to ParDo, runs ReduceFn instead of DoFn



Flatten:

  - KStream#merge
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream->



Combine:

  - not as composite, but as primitive GroupByKey/ParDo



Composite Transforms:

  - inlining (I believe)



Side Inputs:

  - write data to a topic with the key being the StateNamespace.stringKey()

  - read topic into a GlobalKTable
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html>
and
materialize it into a KeyValueStore
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/state/KeyValueStore.html>

  - in ParDo, access the KeyValueStore, via the ProcessorContext
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/processor/ProcessorContext.html>,
and use the WindowFn to get the Side Input Window and its
StateNamespace.stringKey() to look up the value



Source API:

  - overridden by SplittableDoFn, via Read.SPLITTABLE_DOFN_PREFERRED_RUNNERS



Impulse:

  - if not exists, create a topic in Kafka and use a KafkaProducer
<https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>
to
generate the initial record

  - generate a KStream from the above topic



SplittableDoFn:

  - overriden by SplittableParDo/SplittableParDoViaKeyedWorkItems

  - extends Stateful Processing, runs ProcessFn instead of DoFn and
Punctuator fires timers as expected by the ProcessFn

  - write watermarks to a separate topic to be aggregated and read into a
GlobalKTable and materialize it into a KeyValueStore



Stateful Processing:

  - StateInternals and TimerInternals are materialized in KeyValueStores

  - extends ParDo, with statefulDoFnRunner and Punctuator advances
TimerInternals (via watermarks KeyValueStore) and fire timers


Thanks,

Kyle Winkelman

Re: Kafka Streams Runner [BEAM-2466]

Posted by Luke Cwik <lc...@google.com>.
That's exciting.

I would suggest that you take a look at implementing a portable runner so
that you get cross language pipelines and the ability to execute Python and
Go pipelines. Looking at https://s.apache.org/beam-fn-api and the Flink or
Samza implementations would be good starting points.



On Fri, Sep 25, 2020 at 11:35 AM Pablo Estrada <pa...@google.com> wrote:

> Hi Kyle,
> this is very cool. For others, here's the link to compare Kyle's branch to
> Beam's master[1].
>
> I don't have a lot of experience writing runners, so someone else should
> look as well, but I'll try to take a look at your branch in the next few
> days, because I think it's pretty cool : )
>
>
> [1]
> https://github.com/apache/beam/compare/master...kyle-winkelman:kafka-streams
>
> On Thu, Sep 24, 2020 at 9:53 AM Kyle Winkelman <wi...@gmail.com>
> wrote:
>
>> Hello everyone,
>>
>>
>>
>> Jira:
>>
>> https://issues.apache.org/jira/browse/BEAM-2466
>>
>>
>>
>> My Branch:
>>
>> https://github.com/kyle-winkelman/beam/tree/kafka-streams
>>
>>
>>
>> I have taken an initial pass at creating a Kafka Streams Runner. It
>> passes nearly all of the @ValidatesRunner tests. I am hoping to find
>> someone that has experience writing a Runner to take a look at it and give
>> me some feedback before I open a PR.
>>
>>
>>
>> I am using the Kafka Streams DSL
>> <https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html>,
>> so a PCollection is equivalent to a KStream
>> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html>
>> .
>>
>>
>>
>> ParDo:
>>
>>   - implements Transformer
>> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/Transformer.html>
>>  to KStream#transform
>> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-org.apache.kafka.streams.kstream.Named-java.lang.String...->
>>
>>   - outputs KStream<TupleTag<?>, WindowedValue<?>> then uses a
>> KStream#branch
>> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#branch-org.apache.kafka.streams.kstream.Predicate...-> to
>> handle Pardo.MultiOutput
>>
>>   - schedules a Punctuator
>> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/processor/Punctuator.html> to
>> periodically start/finish bundles
>>
>>
>>
>> GroupByKey:
>>
>>   - KStream#repartition
>> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#repartition--> the
>> data to groupOnlyByKey
>>
>>   - groupAlsoByWindow similarly to ParDo, runs ReduceFn instead of DoFn
>>
>>
>>
>> Flatten:
>>
>>   - KStream#merge
>> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream->
>>
>>
>>
>> Combine:
>>
>>   - not as composite, but as primitive GroupByKey/ParDo
>>
>>
>>
>> Composite Transforms:
>>
>>   - inlining (I believe)
>>
>>
>>
>> Side Inputs:
>>
>>   - write data to a topic with the key being the
>> StateNamespace.stringKey()
>>
>>   - read topic into a GlobalKTable
>> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html> and
>> materialize it into a KeyValueStore
>> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/state/KeyValueStore.html>
>>
>>   - in ParDo, access the KeyValueStore, via the ProcessorContext
>> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/processor/ProcessorContext.html>,
>> and use the WindowFn to get the Side Input Window and its
>> StateNamespace.stringKey() to look up the value
>>
>>
>>
>> Source API:
>>
>>   - overridden by SplittableDoFn, via
>> Read.SPLITTABLE_DOFN_PREFERRED_RUNNERS
>>
>>
>>
>> Impulse:
>>
>>   - if not exists, create a topic in Kafka and use a KafkaProducer
>> <https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html> to
>> generate the initial record
>>
>>   - generate a KStream from the above topic
>>
>>
>>
>> SplittableDoFn:
>>
>>   - overriden by SplittableParDo/SplittableParDoViaKeyedWorkItems
>>
>>   - extends Stateful Processing, runs ProcessFn instead of DoFn and
>> Punctuator fires timers as expected by the ProcessFn
>>
>>   - write watermarks to a separate topic to be aggregated and read into a
>> GlobalKTable and materialize it into a KeyValueStore
>>
>>
>>
>> Stateful Processing:
>>
>>   - StateInternals and TimerInternals are materialized in KeyValueStores
>>
>>   - extends ParDo, with statefulDoFnRunner and Punctuator advances
>> TimerInternals (via watermarks KeyValueStore) and fire timers
>>
>>
>> Thanks,
>>
>> Kyle Winkelman
>>
>

Re: Kafka Streams Runner [BEAM-2466]

Posted by Pablo Estrada <pa...@google.com>.
Hi Kyle,
this is very cool. For others, here's the link to compare Kyle's branch to
Beam's master[1].

I don't have a lot of experience writing runners, so someone else should
look as well, but I'll try to take a look at your branch in the next few
days, because I think it's pretty cool : )


[1]
https://github.com/apache/beam/compare/master...kyle-winkelman:kafka-streams

On Thu, Sep 24, 2020 at 9:53 AM Kyle Winkelman <wi...@gmail.com>
wrote:

> Hello everyone,
>
>
>
> Jira:
>
> https://issues.apache.org/jira/browse/BEAM-2466
>
>
>
> My Branch:
>
> https://github.com/kyle-winkelman/beam/tree/kafka-streams
>
>
>
> I have taken an initial pass at creating a Kafka Streams Runner. It passes
> nearly all of the @ValidatesRunner tests. I am hoping to find someone that
> has experience writing a Runner to take a look at it and give me some
> feedback before I open a PR.
>
>
>
> I am using the Kafka Streams DSL
> <https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html>,
> so a PCollection is equivalent to a KStream
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html>
> .
>
>
>
> ParDo:
>
>   - implements Transformer
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/Transformer.html>
>  to KStream#transform
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-org.apache.kafka.streams.kstream.Named-java.lang.String...->
>
>   - outputs KStream<TupleTag<?>, WindowedValue<?>> then uses a
> KStream#branch
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#branch-org.apache.kafka.streams.kstream.Predicate...-> to
> handle Pardo.MultiOutput
>
>   - schedules a Punctuator
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/processor/Punctuator.html> to
> periodically start/finish bundles
>
>
>
> GroupByKey:
>
>   - KStream#repartition
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#repartition--> the
> data to groupOnlyByKey
>
>   - groupAlsoByWindow similarly to ParDo, runs ReduceFn instead of DoFn
>
>
>
> Flatten:
>
>   - KStream#merge
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream->
>
>
>
> Combine:
>
>   - not as composite, but as primitive GroupByKey/ParDo
>
>
>
> Composite Transforms:
>
>   - inlining (I believe)
>
>
>
> Side Inputs:
>
>   - write data to a topic with the key being the StateNamespace.stringKey()
>
>   - read topic into a GlobalKTable
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html> and
> materialize it into a KeyValueStore
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/state/KeyValueStore.html>
>
>   - in ParDo, access the KeyValueStore, via the ProcessorContext
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/processor/ProcessorContext.html>,
> and use the WindowFn to get the Side Input Window and its
> StateNamespace.stringKey() to look up the value
>
>
>
> Source API:
>
>   - overridden by SplittableDoFn, via
> Read.SPLITTABLE_DOFN_PREFERRED_RUNNERS
>
>
>
> Impulse:
>
>   - if not exists, create a topic in Kafka and use a KafkaProducer
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html> to
> generate the initial record
>
>   - generate a KStream from the above topic
>
>
>
> SplittableDoFn:
>
>   - overriden by SplittableParDo/SplittableParDoViaKeyedWorkItems
>
>   - extends Stateful Processing, runs ProcessFn instead of DoFn and
> Punctuator fires timers as expected by the ProcessFn
>
>   - write watermarks to a separate topic to be aggregated and read into a
> GlobalKTable and materialize it into a KeyValueStore
>
>
>
> Stateful Processing:
>
>   - StateInternals and TimerInternals are materialized in KeyValueStores
>
>   - extends ParDo, with statefulDoFnRunner and Punctuator advances
> TimerInternals (via watermarks KeyValueStore) and fire timers
>
>
> Thanks,
>
> Kyle Winkelman
>