You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Antony Mayi <an...@yahoo.com> on 2017/02/20 09:30:17 UTC

collect to local

Hi,
what is the best way to fetch content of PCollection to local memory/process (something like calling .collect() on Spark rdd)? Do I need to implement custom Sink?
Thanks for any clues,Antony.

Re: collect to local

Posted by Dan Halperin <dh...@google.com>.
It may also be worth looking at how Scio implements this. The Scio version
based on Beam is here: https://github.com/spotify/scio/tree/apache-beam

And they have given some good talks.
https://www.slideshare.net/sinisalyh/scio-a-scala-api-for-google-cloud-dataflow-apache-beam
I believe the "closeAndCollect" operator in Scio is the one like Amit is
discussion.

On Mon, Feb 20, 2017 at 2:32 AM, Amit Sela <am...@gmail.com> wrote:

> You could consider using Aggregators or Metrics (Metrics are still
> experimental and currently only supported by the Direct and Spark runner).
>
> Simply add a DoFn that reports to the Aggregator - see here
> <https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java#L108> how
> to use Aggregators in DoFn.
> Then query
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L75>
> the result in the PipelineResult.
>
> Would this work for your use case ?
>
>
> On Mon, Feb 20, 2017 at 12:17 PM Antony Mayi <an...@yahoo.com> wrote:
>
>>
>> Thanks Amit,
>>
>> I fully understand the controversy of trying to collect Big data into
>> local memory... But lets say the data is result of some reduce operation so
>> driver OOM is not a problem and further processing needs to continue in the
>> driver and getting it there via Kafka is an overkill (ie the system would
>> otherwise not use Kafka at all so this would mean new dependency). I get
>> the point that I could implement all the rest on PCollection but once
>> (significant) part of the pipeline doesn't need big-data/map-reduce
>> tool-set, it would just be way easier implementing it locally.
>>
>> Antony.
>> On Monday, 20 February 2017, 10:53, Amit Sela <am...@gmail.com>
>> wrote:
>>
>>
>> Hi Antony,
>>
>> Generally, PCollections are a distributed bag of elements, just like
>> Spark RDDs (for batch).
>> Assuming you have a distributed collection, you probably wouldn't want to
>> materialize it locally, and even if it's a global count (result) of some
>> kind (guaranteeing to avoid OOM in your "driver") you'd probably want to
>> write it to a Sink of some kind - Kafka, HDFS, etc.
>>
>> I'm curious how would you use "collect()" or materializing the
>> PCollection in the driver program ? what did you have in mind ?
>>
>> You can implement a custom Sink - Spark runner has it's own ConsoleIO to
>> print to screen using Spark's print() but I use it for dev iterations and
>> it clearly works only for the Spark runner.
>>
>> Amit
>>
>>
>> On Mon, Feb 20, 2017 at 11:40 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>> Hi Antony,
>>
>> The Spark runner deals with caching/persist for you (analyzing how many
>> time the same PCollection is used).
>>
>> For the collect(), I don't fully understand your question.
>>
>> If if you want to process elements in the PCollection, you can do simple
>> ParDo:
>>
>> .apply(ParDo.of(new DoFn() {
>>    @ProcessElement
>>    public void processElements(ProcessContext context) {
>>      T element = context.element();
>>      // do whatever you want
>>    }
>> })
>>
>> Is it not what you want ?
>>
>> Regards
>> JB
>>
>> On 02/20/2017 10:30 AM, Antony Mayi wrote:
>> > Hi,
>> >
>> > what is the best way to fetch content of PCollection to local
>> > memory/process (something like calling .collect() on Spark rdd)? Do I
>> > need to implement custom Sink?
>> >
>> > Thanks for any clues,
>> > Antony.
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>>
>>
>>

Re: collect to local

Posted by Amit Sela <am...@gmail.com>.
You could consider using Aggregators or Metrics (Metrics are still
experimental and currently only supported by the Direct and Spark runner).

Simply add a DoFn that reports to the Aggregator - see here
<https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java#L108>
how
to use Aggregators in DoFn.
Then query
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L75>
the result in the PipelineResult.

Would this work for your use case ?


On Mon, Feb 20, 2017 at 12:17 PM Antony Mayi <an...@yahoo.com> wrote:

>
> Thanks Amit,
>
> I fully understand the controversy of trying to collect Big data into
> local memory... But lets say the data is result of some reduce operation so
> driver OOM is not a problem and further processing needs to continue in the
> driver and getting it there via Kafka is an overkill (ie the system would
> otherwise not use Kafka at all so this would mean new dependency). I get
> the point that I could implement all the rest on PCollection but once
> (significant) part of the pipeline doesn't need big-data/map-reduce
> tool-set, it would just be way easier implementing it locally.
>
> Antony.
> On Monday, 20 February 2017, 10:53, Amit Sela <am...@gmail.com>
> wrote:
>
>
> Hi Antony,
>
> Generally, PCollections are a distributed bag of elements, just like Spark
> RDDs (for batch).
> Assuming you have a distributed collection, you probably wouldn't want to
> materialize it locally, and even if it's a global count (result) of some
> kind (guaranteeing to avoid OOM in your "driver") you'd probably want to
> write it to a Sink of some kind - Kafka, HDFS, etc.
>
> I'm curious how would you use "collect()" or materializing the PCollection
> in the driver program ? what did you have in mind ?
>
> You can implement a custom Sink - Spark runner has it's own ConsoleIO to
> print to screen using Spark's print() but I use it for dev iterations and
> it clearly works only for the Spark runner.
>
> Amit
>
>
> On Mon, Feb 20, 2017 at 11:40 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> Hi Antony,
>
> The Spark runner deals with caching/persist for you (analyzing how many
> time the same PCollection is used).
>
> For the collect(), I don't fully understand your question.
>
> If if you want to process elements in the PCollection, you can do simple
> ParDo:
>
> .apply(ParDo.of(new DoFn() {
>    @ProcessElement
>    public void processElements(ProcessContext context) {
>      T element = context.element();
>      // do whatever you want
>    }
> })
>
> Is it not what you want ?
>
> Regards
> JB
>
> On 02/20/2017 10:30 AM, Antony Mayi wrote:
> > Hi,
> >
> > what is the best way to fetch content of PCollection to local
> > memory/process (something like calling .collect() on Spark rdd)? Do I
> > need to implement custom Sink?
> >
> > Thanks for any clues,
> > Antony.
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>

Re: collect to local

Posted by Antony Mayi <an...@yahoo.com>.
Thanks Amit,
I fully understand the controversy of trying to collect Big data into local memory... But lets say the data is result of some reduce operation so driver OOM is not a problem and further processing needs to continue in the driver and getting it there via Kafka is an overkill (ie the system would otherwise not use Kafka at all so this would mean new dependency). I get the point that I could implement all the rest on PCollection but once (significant) part of the pipeline doesn't need big-data/map-reduce tool-set, it would just be way easier implementing it locally.
Antony.    On Monday, 20 February 2017, 10:53, Amit Sela <am...@gmail.com> wrote:
 

 Hi Antony, 
Generally, PCollections are a distributed bag of elements, just like Spark RDDs (for batch).Assuming you have a distributed collection, you probably wouldn't want to materialize it locally, and even if it's a global count (result) of some kind (guaranteeing to avoid OOM in your "driver") you'd probably want to write it to a Sink of some kind - Kafka, HDFS, etc.
I'm curious how would you use "collect()" or materializing the PCollection in the driver program ? what did you have in mind ?
You can implement a custom Sink - Spark runner has it's own ConsoleIO to print to screen using Spark's print() but I use it for dev iterations and it clearly works only for the Spark runner.
Amit 
On Mon, Feb 20, 2017 at 11:40 AM Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:

Hi Antony,

The Spark runner deals with caching/persist for you (analyzing how many
time the same PCollection is used).

For the collect(), I don't fully understand your question.

If if you want to process elements in the PCollection, you can do simple
ParDo:

.apply(ParDo.of(new DoFn() {
   @ProcessElement
   public void processElements(ProcessContext context) {
     T element = context.element();
     // do whatever you want
   }
})

Is it not what you want ?

Regards
JB

On 02/20/2017 10:30 AM, Antony Mayi wrote:
> Hi,
>
> what is the best way to fetch content of PCollection to local
> memory/process (something like calling .collect() on Spark rdd)? Do I
> need to implement custom Sink?
>
> Thanks for any clues,
> Antony.

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



   

Re: collect to local

Posted by Amit Sela <am...@gmail.com>.
Hi Antony,

Generally, PCollections are a distributed bag of elements, just like Spark
RDDs (for batch).
Assuming you have a distributed collection, you probably wouldn't want to
materialize it locally, and even if it's a global count (result) of some
kind (guaranteeing to avoid OOM in your "driver") you'd probably want to
write it to a Sink of some kind - Kafka, HDFS, etc.

I'm curious how would you use "collect()" or materializing the PCollection
in the driver program ? what did you have in mind ?

You can implement a custom Sink - Spark runner has it's own ConsoleIO to
print to screen using Spark's print() but I use it for dev iterations and
it clearly works only for the Spark runner.

Amit


On Mon, Feb 20, 2017 at 11:40 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Antony,
>
> The Spark runner deals with caching/persist for you (analyzing how many
> time the same PCollection is used).
>
> For the collect(), I don't fully understand your question.
>
> If if you want to process elements in the PCollection, you can do simple
> ParDo:
>
> .apply(ParDo.of(new DoFn() {
>    @ProcessElement
>    public void processElements(ProcessContext context) {
>      T element = context.element();
>      // do whatever you want
>    }
> })
>
> Is it not what you want ?
>
> Regards
> JB
>
> On 02/20/2017 10:30 AM, Antony Mayi wrote:
> > Hi,
> >
> > what is the best way to fetch content of PCollection to local
> > memory/process (something like calling .collect() on Spark rdd)? Do I
> > need to implement custom Sink?
> >
> > Thanks for any clues,
> > Antony.
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: collect to local

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Agree with Amit. Not sure it would be portable to provide such function.

Regards
JB

On 02/20/2017 11:04 AM, Amit Sela wrote:
> Spark runner's EvaluationContext
> <https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L201>
> has a hook ready for this - but clearly only for batch, in streaming
> this feature doesn't seem relevant.
>
> You can easily hack this in the Spark runner, but for Beam in general I
> wonder how this would work in a runner-agnostic way ?
> Spark has a driver process, not sure how this works for other runners.
>
> On Mon, Feb 20, 2017 at 11:54 AM Antony Mayi <antonymayi@yahoo.com
> <ma...@yahoo.com>> wrote:
>
>     Thanks Jean,
>
>     my point is to retrieve the data represented let say as
>     PCollection<String> to a List<String> (not
>     PCollection<List<String>>) - essentially fetching it all to address
>     space of the local driver process (this is what Spark's .collect()
>     does). It would be a reverse of the beam.sdks.transforms.Create -
>     which takes a local iterable and distributes it into PCollection -
>     at the end of the pipeline I would like to get the result back to
>     single iterable (hence I assuming it would need some type of Sink).
>
>     Thanks,
>     Antony.
>
>
>     On Monday, 20 February 2017, 10:40, Jean-Baptiste Onofr�
>     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>
>
>     Hi Antony,
>
>     The Spark runner deals with caching/persist for you (analyzing how many
>     time the same PCollection is used).
>
>     For the collect(), I don't fully understand your question.
>
>     If if you want to process elements in the PCollection, you can do
>     simple
>     ParDo:
>
>     .apply(ParDo.of(new DoFn() {
>       @ProcessElement
>       public void processElements(ProcessContext context) {
>         T element = context.element();
>         // do whatever you want
>       }
>     })
>
>     Is it not what you want ?
>
>     Regards
>     JB
>
>     On 02/20/2017 10:30 AM, Antony Mayi wrote:
>     > Hi,
>     >
>     > what is the best way to fetch content of PCollection to local
>     > memory/process (something like calling .collect() on Spark rdd)? Do I
>     > need to implement custom Sink?
>     >
>     > Thanks for any clues,
>     > Antony.
>
>
>     --
>     Jean-Baptiste Onofr�
>     jbonofre@apache.org <ma...@apache.org>
>     http://blog.nanthrax.net <http://blog.nanthrax.net/>
>     Talend - http://www.talend.com <http://www.talend.com/>
>
>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: collect to local

Posted by Amit Sela <am...@gmail.com>.
Spark runner's EvaluationContext
<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L201>
has a hook ready for this - but clearly only for batch, in streaming this
feature doesn't seem relevant.

You can easily hack this in the Spark runner, but for Beam in general I
wonder how this would work in a runner-agnostic way ?
Spark has a driver process, not sure how this works for other runners.

On Mon, Feb 20, 2017 at 11:54 AM Antony Mayi <an...@yahoo.com> wrote:

> Thanks Jean,
>
> my point is to retrieve the data represented let say as
> PCollection<String> to a List<String> (not PCollection<List<String>>) -
> essentially fetching it all to address space of the local driver process
> (this is what Spark's .collect() does). It would be a reverse of the
> beam.sdks.transforms.Create - which takes a local iterable and distributes
> it into PCollection - at the end of the pipeline I would like to get the
> result back to single iterable (hence I assuming it would need some type of
> Sink).
>
> Thanks,
> Antony.
>
>
> On Monday, 20 February 2017, 10:40, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
>
> Hi Antony,
>
> The Spark runner deals with caching/persist for you (analyzing how many
> time the same PCollection is used).
>
> For the collect(), I don't fully understand your question.
>
> If if you want to process elements in the PCollection, you can do simple
> ParDo:
>
> .apply(ParDo.of(new DoFn() {
>   @ProcessElement
>   public void processElements(ProcessContext context) {
>     T element = context.element();
>     // do whatever you want
>   }
> })
>
> Is it not what you want ?
>
> Regards
> JB
>
> On 02/20/2017 10:30 AM, Antony Mayi wrote:
> > Hi,
> >
> > what is the best way to fetch content of PCollection to local
> > memory/process (something like calling .collect() on Spark rdd)? Do I
> > need to implement custom Sink?
> >
> > Thanks for any clues,
> > Antony.
>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>

Re: collect to local

Posted by Antony Mayi <an...@yahoo.com>.
Thanks Jean,
my point is to retrieve the data represented let say as PCollection<String> to a List<String> (not PCollection<List<String>>) - essentially fetching it all to address space of the local driver process (this is what Spark's .collect() does). It would be a reverse of the beam.sdks.transforms.Create - which takes a local iterable and distributes it into PCollection - at the end of the pipeline I would like to get the result back to single iterable (hence I assuming it would need some type of Sink).
Thanks,Antony. 

    On Monday, 20 February 2017, 10:40, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
 

 Hi Antony,

The Spark runner deals with caching/persist for you (analyzing how many 
time the same PCollection is used).

For the collect(), I don't fully understand your question.

If if you want to process elements in the PCollection, you can do simple 
ParDo:

.apply(ParDo.of(new DoFn() {
  @ProcessElement
  public void processElements(ProcessContext context) {
    T element = context.element();
    // do whatever you want
  }
})

Is it not what you want ?

Regards
JB

On 02/20/2017 10:30 AM, Antony Mayi wrote:
> Hi,
>
> what is the best way to fetch content of PCollection to local
> memory/process (something like calling .collect() on Spark rdd)? Do I
> need to implement custom Sink?
>
> Thanks for any clues,
> Antony.

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


   

Re: collect to local

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Antony,

The Spark runner deals with caching/persist for you (analyzing how many 
time the same PCollection is used).

For the collect(), I don't fully understand your question.

If if you want to process elements in the PCollection, you can do simple 
ParDo:

.apply(ParDo.of(new DoFn() {
   @ProcessElement
   public void processElements(ProcessContext context) {
     T element = context.element();
     // do whatever you want
   }
})

Is it not what you want ?

Regards
JB

On 02/20/2017 10:30 AM, Antony Mayi wrote:
> Hi,
>
> what is the best way to fetch content of PCollection to local
> memory/process (something like calling .collect() on Spark rdd)? Do I
> need to implement custom Sink?
>
> Thanks for any clues,
> Antony.

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com