You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "S. Sahayaraj" <ss...@quark.com> on 2018/06/06 15:34:01 UTC

Create PCollection from Listin DoFn<>

Hello,
                I have created a java class which extends DoFn<>, there are list of objects of type ABC (List<ABC>) populated in processElement(ProcessContext c) at runtime and would like to generate respective PCollection<ABC> from List<ABC> so that the subsequent transformation can do parallel execution on each ABC object in PCollection<ABC>. How do we create PCollection from in-memory object created in DoFn<>? OR How do we get pipeline object being in DoFn<>? OR is there any SDK guidelines to refer?


Thanks,
S. Sahayaraj

Re: [EXTERNAL] - Re: Create PCollection from Listin DoFn<>

Posted by Eugene Kirpichov <ki...@google.com>.
No ready-made apps specifically for Beam, but if I may insert a shameless
plug, check out http://jkff.info/software/timeplotters , specifically
"splot" - with some added logging in your code, you'll probably be able to
get the visualization you want out of said logs.

On Fri, Jun 8, 2018 at 6:22 AM S. Sahayaraj <ss...@quark.com> wrote:

> Thank you.  Flatten.iterables gives the answer to my problem, Great stuff
> and promising!!. I have now PCollection<ABC> which has more than 4000
> datasets (ie 4000 ABC objects in PCollection), that will be executed by
> ParDo.of(new ExecFn()).  The computing environment here is on Spark cluster
> which has 8 workers, able to see workers, DAG visualization details on
> Spark admin UI. But, precisely I would like to visualize the parallel
> computation on ABC by ExecFn(). Is there any available tool or app or 3rd
> party components that helps to figure out parallelism happening on the
> pipeline?  Please suggest.
>
>
>
> Cheers,
>
> S. Sahayaraj
>
>
>
> *From:* Eugene Kirpichov [mailto:kirpichov@google.com]
> *Sent:* Thursday, June 7, 2018 10:32 PM
> *To:* user@beam.apache.org
> *Subject:* Re: [EXTERNAL] - Re: Create PCollection<ABC> from List<ABC>in
> DoFn<>
>
>
>
> Also, remember that a DoFn can return multiple results:
>
>
>
> @ProcessElement
>
> void process(...) {
>
>   for (...) {
>
>     c.output(...);
>
>   }
>
> }
>
>
>
> On Thu, Jun 7, 2018 at 9:27 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
> And if you have a DoFn<X, List<ABC>> you can follow this with
> Flatten.iterables
> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/Flatten.Iterables.html> to
> turn the output PCollection<List<ABC>> into a PCollection<ABC>. In some
> cases you may want to follow this with a Reshuffle
> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey--> so
> that the outputs from a single X get distributed among multiple machines.
>
>
>
> On Thu, Jun 7, 2018 at 8:19 AM Marián Dvorský <ma...@google.com> wrote:
>
> If you have a function which given X returns a List<ABC>, you can use
> FlatMapElements
> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/FlatMapElements.html> transform
> on PCollection<X> to get a PCollection<ABC>.
>
> On Thu, Jun 7, 2018 at 8:16 AM S. Sahayaraj <ss...@quark.com> wrote:
>
> In case if we could return List<ABC> from DoFn<> then we could use the
> code as suggested in section 3.1.2 and mentioned by you below., but the
> return type of DoFn<> is always PCollection<> in where I could not have the
> list of ABC objects which further will be fed as input for parallel
> computation. Is there any possibility to convert List<ABC> to
> PCollection<ABC> in DoFn<> itself? OR can DoFn<> return List<ABC> objects?
>
>
>
>
>
> Cheers,
>
> S. Sahayaraj
>
>
>
> *From:* Robert Bradshaw [mailto:robertwb@google.com]
> *Sent:* Wednesday, June 6, 2018 9:40 PM
> *To:* user@beam.apache.org
> *Subject:* [EXTERNAL] - Re: Create PCollection<ABC> from List<ABC>in
> DoFn<>
>
>
>
> You can use the Create transform to do this, e.g.
>
>
>
>   Pipeline p = ...
>
>   List<ABC> inMemoryObjects = ...
>
>   PCollection<ABC> pcollectionOfObject = p.apply(Create.of(
> inMemoryObjects));
>
>   result = pcollectionOfObject.apply(ParDo.of(SomeDoFn...));
>
>
>
> See section 3.1.2 at
> https://beam.apache.org/documentation/programming-guide/#pcollections
>
>
>
> On Wed, Jun 6, 2018 at 8:34 AM S. Sahayaraj <ss...@quark.com> wrote:
>
> Hello,
>
>                 I have created a java class which extends DoFn<>, there
> are list of objects of type ABC (List<ABC>) populated in
> processElement(ProcessContext c) at runtime and would like to generate
> respective PCollection<ABC> from List<ABC> so that the subsequent
> transformation can do parallel execution on each ABC object in
> PCollection<ABC>. How do we create PCollection from in-memory object
> created in DoFn<>? OR How do we get pipeline object being in DoFn<>? OR is
> there any SDK guidelines to refer?
>
>
>
>
>
> Thanks,
>
> S. Sahayaraj
>
>

RE: [EXTERNAL] - Re: Create PCollection from Listin DoFn<>

Posted by "S. Sahayaraj" <ss...@quark.com>.
Thank you.  Flatten.iterables gives the answer to my problem, Great stuff and promising!!. I have now PCollection<ABC> which has more than 4000 datasets (ie 4000 ABC objects in PCollection), that will be executed by ParDo.of(new ExecFn()).  The computing environment here is on Spark cluster which has 8 workers, able to see workers, DAG visualization details on Spark admin UI. But, precisely I would like to visualize the parallel computation on ABC by ExecFn(). Is there any available tool or app or 3rd party components that helps to figure out parallelism happening on the pipeline?  Please suggest.

Cheers,
S. Sahayaraj

From: Eugene Kirpichov [mailto:kirpichov@google.com]
Sent: Thursday, June 7, 2018 10:32 PM
To: user@beam.apache.org
Subject: Re: [EXTERNAL] - Re: Create PCollection<ABC> from List<ABC>in DoFn<>

Also, remember that a DoFn can return multiple results:

@ProcessElement
void process(...) {
  for (...) {
    c.output(...);
  }
}

On Thu, Jun 7, 2018 at 9:27 AM Robert Bradshaw <ro...@google.com>> wrote:
And if you have a DoFn<X, List<ABC>> you can follow this with Flatten.iterables<https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/Flatten.Iterables.html> to turn the output PCollection<List<ABC>> into a PCollection<ABC>. In some cases you may want to follow this with a Reshuffle<https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey--> so that the outputs from a single X get distributed among multiple machines.

On Thu, Jun 7, 2018 at 8:19 AM Marián Dvorský <ma...@google.com>> wrote:
If you have a function which given X returns a List<ABC>, you can use FlatMapElements<https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/FlatMapElements.html> transform on PCollection<X> to get a PCollection<ABC>.
On Thu, Jun 7, 2018 at 8:16 AM S. Sahayaraj <ss...@quark.com>> wrote:
In case if we could return List<ABC> from DoFn<> then we could use the code as suggested in section 3.1.2 and mentioned by you below., but the return type of DoFn<> is always PCollection<> in where I could not have the list of ABC objects which further will be fed as input for parallel computation. Is there any possibility to convert List<ABC> to PCollection<ABC> in DoFn<> itself? OR can DoFn<> return List<ABC> objects?


Cheers,
S. Sahayaraj

From: Robert Bradshaw [mailto:robertwb@google.com<ma...@google.com>]
Sent: Wednesday, June 6, 2018 9:40 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: [EXTERNAL] - Re: Create PCollection<ABC> from List<ABC>in DoFn<>

You can use the Create transform to do this, e.g.

  Pipeline p = ...
  List<ABC> inMemoryObjects = ...
  PCollection<ABC> pcollectionOfObject = p.apply(Create.of(inMemoryObjects));
  result = pcollectionOfObject.apply(ParDo.of(SomeDoFn...));

See section 3.1.2 at https://beam.apache.org/documentation/programming-guide/#pcollections

On Wed, Jun 6, 2018 at 8:34 AM S. Sahayaraj <ss...@quark.com>> wrote:
Hello,
                I have created a java class which extends DoFn<>, there are list of objects of type ABC (List<ABC>) populated in processElement(ProcessContext c) at runtime and would like to generate respective PCollection<ABC> from List<ABC> so that the subsequent transformation can do parallel execution on each ABC object in PCollection<ABC>. How do we create PCollection from in-memory object created in DoFn<>? OR How do we get pipeline object being in DoFn<>? OR is there any SDK guidelines to refer?


Thanks,
S. Sahayaraj

Re: [EXTERNAL] - Re: Create PCollection from Listin DoFn<>

Posted by Eugene Kirpichov <ki...@google.com>.
Also, remember that a DoFn can return multiple results:

@ProcessElement
void process(...) {
  for (...) {
    c.output(...);
  }
}

On Thu, Jun 7, 2018 at 9:27 AM Robert Bradshaw <ro...@google.com> wrote:

> And if you have a DoFn<X, List<ABC>> you can follow this with
> Flatten.iterables
> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/Flatten.Iterables.html> to
> turn the output PCollection<List<ABC>> into a PCollection<ABC>. In some
> cases you may want to follow this with a Reshuffle
> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey--> so
> that the outputs from a single X get distributed among multiple machines.
>
> On Thu, Jun 7, 2018 at 8:19 AM Marián Dvorský <ma...@google.com> wrote:
>
>> If you have a function which given X returns a List<ABC>, you can use
>> FlatMapElements
>> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/FlatMapElements.html> transform
>> on PCollection<X> to get a PCollection<ABC>.
>>
>> On Thu, Jun 7, 2018 at 8:16 AM S. Sahayaraj <ss...@quark.com> wrote:
>>
>>> In case if we could return List<ABC> from DoFn<> then we could use the
>>> code as suggested in section 3.1.2 and mentioned by you below., but the
>>> return type of DoFn<> is always PCollection<> in where I could not have the
>>> list of ABC objects which further will be fed as input for parallel
>>> computation. Is there any possibility to convert List<ABC> to
>>> PCollection<ABC> in DoFn<> itself? OR can DoFn<> return List<ABC> objects?
>>>
>>>
>>>
>>>
>>>
>>> Cheers,
>>>
>>> S. Sahayaraj
>>>
>>>
>>>
>>> *From:* Robert Bradshaw [mailto:robertwb@google.com]
>>> *Sent:* Wednesday, June 6, 2018 9:40 PM
>>> *To:* user@beam.apache.org
>>> *Subject:* [EXTERNAL] - Re: Create PCollection<ABC> from List<ABC>in
>>> DoFn<>
>>>
>>>
>>>
>>> You can use the Create transform to do this, e.g.
>>>
>>>
>>>
>>>   Pipeline p = ...
>>>
>>>   List<ABC> inMemoryObjects = ...
>>>
>>>   PCollection<ABC> pcollectionOfObject = p.apply(Create.of(
>>> inMemoryObjects));
>>>
>>>   result = pcollectionOfObject.apply(ParDo.of(SomeDoFn...));
>>>
>>>
>>>
>>> See section 3.1.2 at
>>> https://beam.apache.org/documentation/programming-guide/#pcollections
>>>
>>>
>>>
>>> On Wed, Jun 6, 2018 at 8:34 AM S. Sahayaraj <ss...@quark.com>
>>> wrote:
>>>
>>> Hello,
>>>
>>>                 I have created a java class which extends DoFn<>, there
>>> are list of objects of type ABC (List<ABC>) populated in
>>> processElement(ProcessContext c) at runtime and would like to generate
>>> respective PCollection<ABC> from List<ABC> so that the subsequent
>>> transformation can do parallel execution on each ABC object in
>>> PCollection<ABC>. How do we create PCollection from in-memory object
>>> created in DoFn<>? OR How do we get pipeline object being in DoFn<>? OR is
>>> there any SDK guidelines to refer?
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>>
>>> S. Sahayaraj
>>>
>>>

Re: [EXTERNAL] - Re: Create PCollection from Listin DoFn<>

Posted by Robert Bradshaw <ro...@google.com>.
And if you have a DoFn<X, List<ABC>> you can follow this with
Flatten.iterables
<https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/Flatten.Iterables.html>
to
turn the output PCollection<List<ABC>> into a PCollection<ABC>. In some
cases you may want to follow this with a Reshuffle
<https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey-->
so
that the outputs from a single X get distributed among multiple machines.

On Thu, Jun 7, 2018 at 8:19 AM Marián Dvorský <ma...@google.com> wrote:

> If you have a function which given X returns a List<ABC>, you can use
> FlatMapElements
> <https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/FlatMapElements.html> transform
> on PCollection<X> to get a PCollection<ABC>.
>
> On Thu, Jun 7, 2018 at 8:16 AM S. Sahayaraj <ss...@quark.com> wrote:
>
>> In case if we could return List<ABC> from DoFn<> then we could use the
>> code as suggested in section 3.1.2 and mentioned by you below., but the
>> return type of DoFn<> is always PCollection<> in where I could not have the
>> list of ABC objects which further will be fed as input for parallel
>> computation. Is there any possibility to convert List<ABC> to
>> PCollection<ABC> in DoFn<> itself? OR can DoFn<> return List<ABC> objects?
>>
>>
>>
>>
>>
>> Cheers,
>>
>> S. Sahayaraj
>>
>>
>>
>> *From:* Robert Bradshaw [mailto:robertwb@google.com]
>> *Sent:* Wednesday, June 6, 2018 9:40 PM
>> *To:* user@beam.apache.org
>> *Subject:* [EXTERNAL] - Re: Create PCollection<ABC> from List<ABC>in
>> DoFn<>
>>
>>
>>
>> You can use the Create transform to do this, e.g.
>>
>>
>>
>>   Pipeline p = ...
>>
>>   List<ABC> inMemoryObjects = ...
>>
>>   PCollection<ABC> pcollectionOfObject = p.apply(Create.of(
>> inMemoryObjects));
>>
>>   result = pcollectionOfObject.apply(ParDo.of(SomeDoFn...));
>>
>>
>>
>> See section 3.1.2 at
>> https://beam.apache.org/documentation/programming-guide/#pcollections
>>
>>
>>
>> On Wed, Jun 6, 2018 at 8:34 AM S. Sahayaraj <ss...@quark.com> wrote:
>>
>> Hello,
>>
>>                 I have created a java class which extends DoFn<>, there
>> are list of objects of type ABC (List<ABC>) populated in
>> processElement(ProcessContext c) at runtime and would like to generate
>> respective PCollection<ABC> from List<ABC> so that the subsequent
>> transformation can do parallel execution on each ABC object in
>> PCollection<ABC>. How do we create PCollection from in-memory object
>> created in DoFn<>? OR How do we get pipeline object being in DoFn<>? OR is
>> there any SDK guidelines to refer?
>>
>>
>>
>>
>>
>> Thanks,
>>
>> S. Sahayaraj
>>
>>

Re: [EXTERNAL] - Re: Create PCollection from Listin DoFn<>

Posted by Marián Dvorský <ma...@google.com>.
If you have a function which given X returns a List<ABC>, you can use
FlatMapElements
<https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/FlatMapElements.html>
transform
on PCollection<X> to get a PCollection<ABC>.

On Thu, Jun 7, 2018 at 8:16 AM S. Sahayaraj <ss...@quark.com> wrote:

> In case if we could return List<ABC> from DoFn<> then we could use the
> code as suggested in section 3.1.2 and mentioned by you below., but the
> return type of DoFn<> is always PCollection<> in where I could not have the
> list of ABC objects which further will be fed as input for parallel
> computation. Is there any possibility to convert List<ABC> to
> PCollection<ABC> in DoFn<> itself? OR can DoFn<> return List<ABC> objects?
>
>
>
>
>
> Cheers,
>
> S. Sahayaraj
>
>
>
> *From:* Robert Bradshaw [mailto:robertwb@google.com]
> *Sent:* Wednesday, June 6, 2018 9:40 PM
> *To:* user@beam.apache.org
> *Subject:* [EXTERNAL] - Re: Create PCollection<ABC> from List<ABC>in
> DoFn<>
>
>
>
> You can use the Create transform to do this, e.g.
>
>
>
>   Pipeline p = ...
>
>   List<ABC> inMemoryObjects = ...
>
>   PCollection<ABC> pcollectionOfObject = p.apply(Create.of(
> inMemoryObjects));
>
>   result = pcollectionOfObject.apply(ParDo.of(SomeDoFn...));
>
>
>
> See section 3.1.2 at
> https://beam.apache.org/documentation/programming-guide/#pcollections
>
>
>
> On Wed, Jun 6, 2018 at 8:34 AM S. Sahayaraj <ss...@quark.com> wrote:
>
> Hello,
>
>                 I have created a java class which extends DoFn<>, there
> are list of objects of type ABC (List<ABC>) populated in
> processElement(ProcessContext c) at runtime and would like to generate
> respective PCollection<ABC> from List<ABC> so that the subsequent
> transformation can do parallel execution on each ABC object in
> PCollection<ABC>. How do we create PCollection from in-memory object
> created in DoFn<>? OR How do we get pipeline object being in DoFn<>? OR is
> there any SDK guidelines to refer?
>
>
>
>
>
> Thanks,
>
> S. Sahayaraj
>
>

RE: [EXTERNAL] - Re: Create PCollection from Listin DoFn<>

Posted by "S. Sahayaraj" <ss...@quark.com>.
In case if we could return List<ABC> from DoFn<> then we could use the code as suggested in section 3.1.2 and mentioned by you below., but the return type of DoFn<> is always PCollection<> in where I could not have the list of ABC objects which further will be fed as input for parallel computation. Is there any possibility to convert List<ABC> to PCollection<ABC> in DoFn<> itself? OR can DoFn<> return List<ABC> objects?


Cheers,
S. Sahayaraj

From: Robert Bradshaw [mailto:robertwb@google.com]
Sent: Wednesday, June 6, 2018 9:40 PM
To: user@beam.apache.org
Subject: [EXTERNAL] - Re: Create PCollection<ABC> from List<ABC>in DoFn<>

You can use the Create transform to do this, e.g.

  Pipeline p = ...
  List<ABC> inMemoryObjects = ...
  PCollection<ABC> pcollectionOfObject = p.apply(Create.of(inMemoryObjects));
  result = pcollectionOfObject.apply(ParDo.of(SomeDoFn...));

See section 3.1.2 at https://beam.apache.org/documentation/programming-guide/#pcollections

On Wed, Jun 6, 2018 at 8:34 AM S. Sahayaraj <ss...@quark.com>> wrote:
Hello,
                I have created a java class which extends DoFn<>, there are list of objects of type ABC (List<ABC>) populated in processElement(ProcessContext c) at runtime and would like to generate respective PCollection<ABC> from List<ABC> so that the subsequent transformation can do parallel execution on each ABC object in PCollection<ABC>. How do we create PCollection from in-memory object created in DoFn<>? OR How do we get pipeline object being in DoFn<>? OR is there any SDK guidelines to refer?


Thanks,
S. Sahayaraj

Re: Create PCollection from Listin DoFn<>

Posted by Robert Bradshaw <ro...@google.com>.
You can use the Create transform to do this, e.g.

  Pipeline p = ...
  List<ABC> inMemoryObjects = ...
  PCollection<ABC> pcollectionOfObject = p.apply(Create.of(
inMemoryObjects));
  result = pcollectionOfObject.apply(ParDo.of(SomeDoFn...));

See section 3.1.2 at
https://beam.apache.org/documentation/programming-guide/#pcollections

On Wed, Jun 6, 2018 at 8:34 AM S. Sahayaraj <ss...@quark.com> wrote:

> Hello,
>
>                 I have created a java class which extends DoFn<>, there
> are list of objects of type ABC (List<ABC>) populated in
> processElement(ProcessContext c) at runtime and would like to generate
> respective PCollection<ABC> from List<ABC> so that the subsequent
> transformation can do parallel execution on each ABC object in
> PCollection<ABC>. How do we create PCollection from in-memory object
> created in DoFn<>? OR How do we get pipeline object being in DoFn<>? OR is
> there any SDK guidelines to refer?
>
>
>
>
>
> Thanks,
>
> S. Sahayaraj
>