You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jean-Baptiste Onofré <jb...@nanthrax.net> on 2016/10/14 12:12:31 UTC

[DISCUSS] Introduce DoFnWithStore

Hi guys,

When testing the different IOs, we want to have the best possible 
coverage and be able to test with different use cases.

We create integration test pipelines, and, one "classic" use case is to 
implement a pipeline starting from an unbounded source (providing an 
unbounded PCollection like Kafka, JMS, MQTT, ...) and sending data to a 
bounded sink (TextIO for instance) expected a bounded PCollection.

This use case is not currently possible. Even when using a Window, it 
will create a chunk of the unbounded PCollection, but the PCollection is 
still unbounded.

That's why I created: https://issues.apache.org/jira/browse/BEAM-638.

However, I don't think a Window Fn/Trigger is the best approach.

A possible solution would be to create a specific IO 
(BoundedWriteFromUnboundedSourceIO similar to the one we have for Read 
;)) to do that, but I think we should provide a more global way, as this 
use case is not specific to IO. For instance, a sorting PTransform will 
work only on a bounded PCollection (not an unbounded).

I wonder if we could not provide a DoFnWithStore. The purpose is to 
store unbounded PCollection elements (squared by a Window for instance) 
into a pluggable store and read from the store to provide a bounded 
PCollection. The store/read trigger could be on the finish bundle.
We could provide "store service", for instance based on GS, HDFS, or any 
other storage (Elasticsearch, Cassandra, ...).

Spark users might be "confused", as in Spark, this behavior is "native" 
thanks to the micro-batches. In spark-streaming, basically a DStream is 
a bounded collection of RDDs.

Basically, the DoFnWithStore will look like a DoFn with implicit 
store/read from the store. Something like:

public abstract class DoFnWithStore extends DoFn {

   @ProcessElement
   @Store(Window)
   ....

}

Generally, SDF sounds like a native way to let users implement this 
behavior explicitly.

My proposal is to do it implicitly and transparently for the end users 
(they just have to provide the Window definition and the store service 
to use).

Thoughts ?

Regards
JB
-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: [DISCUSS] Introduce DoFnWithStore

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
Hi JB,
Can you clarify more what you want to achieve by "writing an unbounded
PCollection to a bounded sink": I see several options:

1. Read some amount of data from the collection (limited by number of
elements or by time), and write just that, and stop reading from it - as if
the collection was actually bounded but we use a prefix of it and
completely throw away the suffix. I believe this is subsumed by
Read.from(UnboundedSource).withMaxNumRecords/withMaxReadTime - with certain
limitations, of course (e.g. there's really no way to limit number of
elements or time when we're reading from several splits of an unbounded
source in parallel from different machines, except applying the limits to
each split; and there's no way to strictly guarantee either #elements or
readTime if the collection is produced by a splittable DoFn).

2. A variation on #1 where "some amount of data" means "a particular
window" - we write that window, throw away data in all other windows, and
stop the pipeline / throw away everything else when the window of interest
closes. I have a hard time thinking of a clean way to do this - how would
you specify which window to write? The window would depend on the data
you're reading from the source, combined with the windowing strategy.

3. Interpret *each window* of the collection as a bounded collection, and
write it to a bounded sink, presumably the sink configured depending on the
particular window. I think this would be useful, and in fact BigQueryIO
does this, however we lack the APIs to allow in general configuring each
sink in a window-dependent way. This is subsumed by
https://issues.apache.org/jira/browse/BEAM-92



On Fri, Oct 14, 2016 at 8:20 AM Lukasz Cwik <lc...@google.com.invalid>
wrote:

> The only way we have today is to use BoundedReadFromUnboundedSource or use
> a side input to bridge an unbounded portion of the pipeline with a bounded
> portion of the pipeline.
> The model allows the side input bridge between these two portions of the
> pipeline to happen but I can't comment as to how well it will work with the
> runners we have today.
> The bounded portion of the pipeline would need to know some set of windows
> it wanted to wait for upfront from the unbounded portion so that the side
> input trigger would fire correctly and allow the bounded portion of the
> pipeline to be scheduled to execute.
>
> On Fri, Oct 14, 2016 at 7:59 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> > Thanks for the update Lukasz.
> >
> > How would you implement a "transform" from unbounded PCollection to
> > bounded PCollection ?
> >
> > Even if I use a GroupByKey with something like KV<K, Iterable<V>>, it
> > doesn't change the type of the PCollection.
> >
> > You are right with State API. My proposal is more a way to implicitly use
> > State in DoFn.
> >
> > Regards
> > JB
> >
> >
> > On 10/14/2016 04:51 PM, Lukasz Cwik wrote:
> >
> >> SplittableDoFn is about taking a single element and turning it into
> >> potentially many in a parallel way by allowing an element to be split
> >> across bundles.
> >>
> >> I believe a user could do what you describe by using a GBK to group
> their
> >> data how they want. In your example it would be a single key, then they
> >> would have KV<K, Iterable<V>> for all the values when reading from that
> >> GBK. The proposed State API seems to also overlap with what your trying
> to
> >> achieve.
> >>
> >>
> >>
> >> On Fri, Oct 14, 2016 at 5:12 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> >> wrote:
> >>
> >> Hi guys,
> >>>
> >>> When testing the different IOs, we want to have the best possible
> >>> coverage
> >>> and be able to test with different use cases.
> >>>
> >>> We create integration test pipelines, and, one "classic" use case is to
> >>> implement a pipeline starting from an unbounded source (providing an
> >>> unbounded PCollection like Kafka, JMS, MQTT, ...) and sending data to a
> >>> bounded sink (TextIO for instance) expected a bounded PCollection.
> >>>
> >>> This use case is not currently possible. Even when using a Window, it
> >>> will
> >>> create a chunk of the unbounded PCollection, but the PCollection is
> still
> >>> unbounded.
> >>>
> >>> That's why I created: https://issues.apache.org/jira/browse/BEAM-638.
> >>>
> >>> However, I don't think a Window Fn/Trigger is the best approach.
> >>>
> >>> A possible solution would be to create a specific IO
> >>> (BoundedWriteFromUnboundedSourceIO similar to the one we have for Read
> >>> ;)) to do that, but I think we should provide a more global way, as
> this
> >>> use case is not specific to IO. For instance, a sorting PTransform will
> >>> work only on a bounded PCollection (not an unbounded).
> >>>
> >>> I wonder if we could not provide a DoFnWithStore. The purpose is to
> store
> >>> unbounded PCollection elements (squared by a Window for instance) into
> a
> >>> pluggable store and read from the store to provide a bounded
> PCollection.
> >>> The store/read trigger could be on the finish bundle.
> >>> We could provide "store service", for instance based on GS, HDFS, or
> any
> >>> other storage (Elasticsearch, Cassandra, ...).
> >>>
> >>> Spark users might be "confused", as in Spark, this behavior is "native"
> >>> thanks to the micro-batches. In spark-streaming, basically a DStream
> is a
> >>> bounded collection of RDDs.
> >>>
> >>> Basically, the DoFnWithStore will look like a DoFn with implicit
> >>> store/read from the store. Something like:
> >>>
> >>> public abstract class DoFnWithStore extends DoFn {
> >>>
> >>>   @ProcessElement
> >>>   @Store(Window)
> >>>   ....
> >>>
> >>> }
> >>>
> >>> Generally, SDF sounds like a native way to let users implement this
> >>> behavior explicitly.
> >>>
> >>> My proposal is to do it implicitly and transparently for the end users
> >>> (they just have to provide the Window definition and the store service
> to
> >>> use).
> >>>
> >>> Thoughts ?
> >>>
> >>> Regards
> >>> JB
> >>> --
> >>> Jean-Baptiste Onofré
> >>> jbonofre@apache.org
> >>> http://blog.nanthrax.net
> >>> Talend - http://www.talend.com
> >>>
> >>>
> >>
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>

Re: [DISCUSS] Introduce DoFnWithStore

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
The only way we have today is to use BoundedReadFromUnboundedSource or use
a side input to bridge an unbounded portion of the pipeline with a bounded
portion of the pipeline.
The model allows the side input bridge between these two portions of the
pipeline to happen but I can't comment as to how well it will work with the
runners we have today.
The bounded portion of the pipeline would need to know some set of windows
it wanted to wait for upfront from the unbounded portion so that the side
input trigger would fire correctly and allow the bounded portion of the
pipeline to be scheduled to execute.

On Fri, Oct 14, 2016 at 7:59 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Thanks for the update Lukasz.
>
> How would you implement a "transform" from unbounded PCollection to
> bounded PCollection ?
>
> Even if I use a GroupByKey with something like KV<K, Iterable<V>>, it
> doesn't change the type of the PCollection.
>
> You are right with State API. My proposal is more a way to implicitly use
> State in DoFn.
>
> Regards
> JB
>
>
> On 10/14/2016 04:51 PM, Lukasz Cwik wrote:
>
>> SplittableDoFn is about taking a single element and turning it into
>> potentially many in a parallel way by allowing an element to be split
>> across bundles.
>>
>> I believe a user could do what you describe by using a GBK to group their
>> data how they want. In your example it would be a single key, then they
>> would have KV<K, Iterable<V>> for all the values when reading from that
>> GBK. The proposed State API seems to also overlap with what your trying to
>> achieve.
>>
>>
>>
>> On Fri, Oct 14, 2016 at 5:12 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>> Hi guys,
>>>
>>> When testing the different IOs, we want to have the best possible
>>> coverage
>>> and be able to test with different use cases.
>>>
>>> We create integration test pipelines, and, one "classic" use case is to
>>> implement a pipeline starting from an unbounded source (providing an
>>> unbounded PCollection like Kafka, JMS, MQTT, ...) and sending data to a
>>> bounded sink (TextIO for instance) expected a bounded PCollection.
>>>
>>> This use case is not currently possible. Even when using a Window, it
>>> will
>>> create a chunk of the unbounded PCollection, but the PCollection is still
>>> unbounded.
>>>
>>> That's why I created: https://issues.apache.org/jira/browse/BEAM-638.
>>>
>>> However, I don't think a Window Fn/Trigger is the best approach.
>>>
>>> A possible solution would be to create a specific IO
>>> (BoundedWriteFromUnboundedSourceIO similar to the one we have for Read
>>> ;)) to do that, but I think we should provide a more global way, as this
>>> use case is not specific to IO. For instance, a sorting PTransform will
>>> work only on a bounded PCollection (not an unbounded).
>>>
>>> I wonder if we could not provide a DoFnWithStore. The purpose is to store
>>> unbounded PCollection elements (squared by a Window for instance) into a
>>> pluggable store and read from the store to provide a bounded PCollection.
>>> The store/read trigger could be on the finish bundle.
>>> We could provide "store service", for instance based on GS, HDFS, or any
>>> other storage (Elasticsearch, Cassandra, ...).
>>>
>>> Spark users might be "confused", as in Spark, this behavior is "native"
>>> thanks to the micro-batches. In spark-streaming, basically a DStream is a
>>> bounded collection of RDDs.
>>>
>>> Basically, the DoFnWithStore will look like a DoFn with implicit
>>> store/read from the store. Something like:
>>>
>>> public abstract class DoFnWithStore extends DoFn {
>>>
>>>   @ProcessElement
>>>   @Store(Window)
>>>   ....
>>>
>>> }
>>>
>>> Generally, SDF sounds like a native way to let users implement this
>>> behavior explicitly.
>>>
>>> My proposal is to do it implicitly and transparently for the end users
>>> (they just have to provide the Window definition and the store service to
>>> use).
>>>
>>> Thoughts ?
>>>
>>> Regards
>>> JB
>>> --
>>> Jean-Baptiste Onofré
>>> jbonofre@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: [DISCUSS] Introduce DoFnWithStore

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Thanks for the update Lukasz.

How would you implement a "transform" from unbounded PCollection to 
bounded PCollection ?

Even if I use a GroupByKey with something like KV<K, Iterable<V>>, it 
doesn't change the type of the PCollection.

You are right with State API. My proposal is more a way to implicitly 
use State in DoFn.

Regards
JB

On 10/14/2016 04:51 PM, Lukasz Cwik wrote:
> SplittableDoFn is about taking a single element and turning it into
> potentially many in a parallel way by allowing an element to be split
> across bundles.
>
> I believe a user could do what you describe by using a GBK to group their
> data how they want. In your example it would be a single key, then they
> would have KV<K, Iterable<V>> for all the values when reading from that
> GBK. The proposed State API seems to also overlap with what your trying to
> achieve.
>
>
>
> On Fri, Oct 14, 2016 at 5:12 AM, Jean-Baptiste Onofr� <jb...@nanthrax.net>
> wrote:
>
>> Hi guys,
>>
>> When testing the different IOs, we want to have the best possible coverage
>> and be able to test with different use cases.
>>
>> We create integration test pipelines, and, one "classic" use case is to
>> implement a pipeline starting from an unbounded source (providing an
>> unbounded PCollection like Kafka, JMS, MQTT, ...) and sending data to a
>> bounded sink (TextIO for instance) expected a bounded PCollection.
>>
>> This use case is not currently possible. Even when using a Window, it will
>> create a chunk of the unbounded PCollection, but the PCollection is still
>> unbounded.
>>
>> That's why I created: https://issues.apache.org/jira/browse/BEAM-638.
>>
>> However, I don't think a Window Fn/Trigger is the best approach.
>>
>> A possible solution would be to create a specific IO
>> (BoundedWriteFromUnboundedSourceIO similar to the one we have for Read
>> ;)) to do that, but I think we should provide a more global way, as this
>> use case is not specific to IO. For instance, a sorting PTransform will
>> work only on a bounded PCollection (not an unbounded).
>>
>> I wonder if we could not provide a DoFnWithStore. The purpose is to store
>> unbounded PCollection elements (squared by a Window for instance) into a
>> pluggable store and read from the store to provide a bounded PCollection.
>> The store/read trigger could be on the finish bundle.
>> We could provide "store service", for instance based on GS, HDFS, or any
>> other storage (Elasticsearch, Cassandra, ...).
>>
>> Spark users might be "confused", as in Spark, this behavior is "native"
>> thanks to the micro-batches. In spark-streaming, basically a DStream is a
>> bounded collection of RDDs.
>>
>> Basically, the DoFnWithStore will look like a DoFn with implicit
>> store/read from the store. Something like:
>>
>> public abstract class DoFnWithStore extends DoFn {
>>
>>   @ProcessElement
>>   @Store(Window)
>>   ....
>>
>> }
>>
>> Generally, SDF sounds like a native way to let users implement this
>> behavior explicitly.
>>
>> My proposal is to do it implicitly and transparently for the end users
>> (they just have to provide the Window definition and the store service to
>> use).
>>
>> Thoughts ?
>>
>> Regards
>> JB
>> --
>> Jean-Baptiste Onofr�
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: [DISCUSS] Introduce DoFnWithStore

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
SplittableDoFn is about taking a single element and turning it into
potentially many in a parallel way by allowing an element to be split
across bundles.

I believe a user could do what you describe by using a GBK to group their
data how they want. In your example it would be a single key, then they
would have KV<K, Iterable<V>> for all the values when reading from that
GBK. The proposed State API seems to also overlap with what your trying to
achieve.



On Fri, Oct 14, 2016 at 5:12 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi guys,
>
> When testing the different IOs, we want to have the best possible coverage
> and be able to test with different use cases.
>
> We create integration test pipelines, and, one "classic" use case is to
> implement a pipeline starting from an unbounded source (providing an
> unbounded PCollection like Kafka, JMS, MQTT, ...) and sending data to a
> bounded sink (TextIO for instance) expected a bounded PCollection.
>
> This use case is not currently possible. Even when using a Window, it will
> create a chunk of the unbounded PCollection, but the PCollection is still
> unbounded.
>
> That's why I created: https://issues.apache.org/jira/browse/BEAM-638.
>
> However, I don't think a Window Fn/Trigger is the best approach.
>
> A possible solution would be to create a specific IO
> (BoundedWriteFromUnboundedSourceIO similar to the one we have for Read
> ;)) to do that, but I think we should provide a more global way, as this
> use case is not specific to IO. For instance, a sorting PTransform will
> work only on a bounded PCollection (not an unbounded).
>
> I wonder if we could not provide a DoFnWithStore. The purpose is to store
> unbounded PCollection elements (squared by a Window for instance) into a
> pluggable store and read from the store to provide a bounded PCollection.
> The store/read trigger could be on the finish bundle.
> We could provide "store service", for instance based on GS, HDFS, or any
> other storage (Elasticsearch, Cassandra, ...).
>
> Spark users might be "confused", as in Spark, this behavior is "native"
> thanks to the micro-batches. In spark-streaming, basically a DStream is a
> bounded collection of RDDs.
>
> Basically, the DoFnWithStore will look like a DoFn with implicit
> store/read from the store. Something like:
>
> public abstract class DoFnWithStore extends DoFn {
>
>   @ProcessElement
>   @Store(Window)
>   ....
>
> }
>
> Generally, SDF sounds like a native way to let users implement this
> behavior explicitly.
>
> My proposal is to do it implicitly and transparently for the end users
> (they just have to provide the Window definition and the store service to
> use).
>
> Thoughts ?
>
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>