You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by kaniska Mandal <ka...@gmail.com> on 2016/05/05 18:09:15 UTC

Whats the recommended approach to run parallel operations inside ParDo ?

Inside a CompositeTransformation --
>  is it Ok to  spawn threads / use CountDownLatch to perform multiple
ParDo on the same data item at the same item
> or all the calls to individual ParDo inherently parallelized

For example, I need to execute  generateAndStoreGraphData() ,
enerateAndStoreTimeSerieseData1() ,  generateAndStoreTimeSerieseData1()*
-- in parallel .*

static class MultiOps extends PTransform<PCollection<T>, PCollection<T>> {

    public PCollection<T> apply(PCollection<T> dataSet) {

     dataSet.apply(ParDo.of( generateAndStoreGraphData());

     dataSet.apply(ParDo.of( generateAndStoreTimeSerieseData1());
     dataSet.apply(ParDo.of( generateAndStoreTimeSerieseData1());

      return results;

    }

  }

=====================

Thanks

Kaniska

Re: Whats the recommended approach to run parallel operations inside ParDo ?

Posted by Lukasz Cwik <lc...@google.com>.
As I was saying, each runner may choose how they parallelize work. To give
a concrete example, the Google Cloud Dataflow runner will do interbundle
parallelization (so different bundles of T's will process through OP1, OP2,
and OP3 in parallel) but not intrabundle parallelization where the within
the same bundle T's will process in parallel in OP1, OP2, and OP3. If there
are very few T's so you are limited on the parallelism there (lets say on
the magnitude of number or cores processing the work), then it would be to
your benefit to perform a reshuffle
<https://github.com/apache/incubator-beam/blob/5bdea1e2bfce7b34a877f198026065b08b36b760/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java>
before each operation (with some random key) to force a break. This would
come at the cost of materializing the dataset 3 times, but it can be worth
it if the dataset has limited parallelism.


On Thu, May 5, 2016 at 7:52 PM, kaniska Mandal <ka...@gmail.com>
wrote:

> Thanks much for reflecting on this requirement.
>
> Yes, it is good enough that different T's go through the
> generateAndStore... in parallel.
>
> Is this possible to execute all the ParDo in parallel - through this
> Pipeline ?
> =>
> pipeline.apply(consumeStreams).apply(ParDo1).apply(ParDo1).apply(ParDo1)
>
> I thought each ParDo will be executed as a sequential step (off course
> internally it will act on each data item in parallel) . isn't it ? or is
> there some other mechanism ?
>
> We have 2 types of requirements :
> (a) parallelize multiple data-persistence operations
>  > as explained in previous email
>
> (b) parallelize multiple data-lookup operations
>  > each ParDo may need to lookup different external systems to perform
> some quick validation / normalization against current data-item, hence we
> need to parallelize the lookups
>
>
> For the sake of simplicity , I have just added below , some sample
> snippets - where I need to parallelize Op1 , Op2 and Op3  - either on same
> data-item or different data-item .
>
> /////////////  /////////////////  //////////////////
>
> public static void main(String args[]) {
>
> FlinkOptions options = PipelineOptionsFactory.create().as(FlinkOptions.
> class);
>
> options.setRunner(FlinkPipelineRunner.class);
>
> Pipeline p = Pipeline.create(options);
>
> PCollection<String> w1 = p.apply(Create.of("Nasa Calling Aliens", "Spaceship
> hovering on Texas."));
>
> PCollection<String> w2 = p.apply(Create.of("Aliens live in Alpha Century.",
> "Search for ET is on.","ET is my favourite Spielberg movie."));
>
> PCollectionList<String> list = PCollectionList.of(w1).and(w2);
>
> PCollection<String> union = list.apply(Flatten.<String> pCollections());
>
> PCollectionTuple tuple = union.apply(new CountWords());
>
> //... omitting console logs
>
> p.run();
>
> }
>
> //////////////////////
>
> public static class CountWords extends PTransform<PCollection<String>,
> PCollectionTuple> {
>
> public PCollectionTuple apply(PCollection<String> lines) {
>
> *//OP-1*
>
> PCollection<KV<String, Long>> pc1 = lines.apply(ParDo.of(new
> ExtractWordsFn("Aliens"))).apply(Count.<String> perElement());
>
> *//OP-2*
>
> PCollection<KV<String, Long>> pc2 = lines.apply(ParDo.of(new
> ExtractWordsFn("ET"))).apply(Count.<String> perElement());
>
> *//OP-3*
>
> PCollection<KV<String, Long>> pc3 = lines.apply(ParDo.of(new
> ExtractWordsFn("Spaceship"))).apply(Count.<String> perElement());
>
>
> return PCollectionTuple.of(tag1, pc1).and(tag2, pc2).and(tag3, pc3);
>
> }
>
> }
>
> /////////////
>
>
> On Thu, May 5, 2016 at 2:04 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> How the work is parallelized is dependent on the runner but you should
>> strive to keep your DoFns as simple as possible and try to avoid additional
>> complexity. Generally a runner will parallelize work automatically
>> splitting the source the data is coming from so that each worker is busy,
>> for example they may use signals such as CPU utilization to either increase
>> the number of bundles of work that are processed in parallel per worker.
>> This allows people to write DoFns that are as simple as possible and not
>> have to worry about how to make them run faster.
>>
>> But there are practical considerations since work can only be split down
>> to how many elements there are and also remote calls may be much more
>> efficient if done in batches but this is a case by case basis and different
>> strategies work better for different problems.
>>
>> Also, as a clarifying point, is it that each generateAndStoreGraphData()
>> , generateAndStoreTimeSerieseData1() , generateAndStoreTimeSerieseData1()
>> need to be called for the same T in parallel (because of some internal
>> requirements) or is it good enough that different T's go through the
>> generateAndStore... in parallel?
>>
>> On Thu, May 5, 2016 at 11:09 AM, kaniska Mandal <kaniska.mandal@gmail.com
>> > wrote:
>>
>>> Inside a CompositeTransformation --
>>> >  is it Ok to  spawn threads / use CountDownLatch to perform multiple
>>> ParDo on the same data item at the same item
>>> > or all the calls to individual ParDo inherently parallelized
>>>
>>> For example, I need to execute  generateAndStoreGraphData() ,
>>> enerateAndStoreTimeSerieseData1() ,  generateAndStoreTimeSerieseData1()*
>>>   -- in parallel .*
>>>
>>> static class MultiOps extends PTransform<PCollection<T>, PCollection<T>>
>>> {
>>>
>>>     public PCollection<T> apply(PCollection<T> dataSet) {
>>>
>>>      dataSet.apply(ParDo.of( generateAndStoreGraphData());
>>>
>>>      dataSet.apply(ParDo.of( generateAndStoreTimeSerieseData1());
>>>      dataSet.apply(ParDo.of( generateAndStoreTimeSerieseData1());
>>>
>>>       return results;
>>>
>>>     }
>>>
>>>   }
>>>
>>> =====================
>>>
>>> Thanks
>>>
>>> Kaniska
>>>
>>
>>
>

Re: Whats the recommended approach to run parallel operations inside ParDo ?

Posted by kaniska Mandal <ka...@gmail.com>.
Thanks much for reflecting on this requirement.

Yes, it is good enough that different T's go through the
generateAndStore... in parallel.

Is this possible to execute all the ParDo in parallel - through this
Pipeline ?
=> pipeline.apply(consumeStreams).apply(ParDo1).apply(ParDo1).apply(ParDo1)


I thought each ParDo will be executed as a sequential step (off course
internally it will act on each data item in parallel) . isn't it ? or is
there some other mechanism ?

We have 2 types of requirements :
(a) parallelize multiple data-persistence operations
 > as explained in previous email

(b) parallelize multiple data-lookup operations
 > each ParDo may need to lookup different external systems to perform some
quick validation / normalization against current data-item, hence we need
to parallelize the lookups


For the sake of simplicity , I have just added below , some sample snippets
- where I need to parallelize Op1 , Op2 and Op3  - either on same data-item
or different data-item .

/////////////  /////////////////  //////////////////

public static void main(String args[]) {

FlinkOptions options = PipelineOptionsFactory.create().as(FlinkOptions.class
);

options.setRunner(FlinkPipelineRunner.class);

Pipeline p = Pipeline.create(options);

PCollection<String> w1 = p.apply(Create.of("Nasa Calling Aliens", "Spaceship
hovering on Texas."));

PCollection<String> w2 = p.apply(Create.of("Aliens live in Alpha
Century.", "Search
for ET is on.","ET is my favourite Spielberg movie."));

PCollectionList<String> list = PCollectionList.of(w1).and(w2);

PCollection<String> union = list.apply(Flatten.<String> pCollections());

PCollectionTuple tuple = union.apply(new CountWords());

//... omitting console logs

p.run();

}

//////////////////////

public static class CountWords extends PTransform<PCollection<String>,
PCollectionTuple> {

public PCollectionTuple apply(PCollection<String> lines) {

*//OP-1*

PCollection<KV<String, Long>> pc1 = lines.apply(ParDo.of(new ExtractWordsFn(
"Aliens"))).apply(Count.<String> perElement());

*//OP-2*

PCollection<KV<String, Long>> pc2 = lines.apply(ParDo.of(new ExtractWordsFn(
"ET"))).apply(Count.<String> perElement());

*//OP-3*

PCollection<KV<String, Long>> pc3 = lines.apply(ParDo.of(new ExtractWordsFn(
"Spaceship"))).apply(Count.<String> perElement());


return PCollectionTuple.of(tag1, pc1).and(tag2, pc2).and(tag3, pc3);

}

}

/////////////

On Thu, May 5, 2016 at 2:04 PM, Lukasz Cwik <lc...@google.com> wrote:

> How the work is parallelized is dependent on the runner but you should
> strive to keep your DoFns as simple as possible and try to avoid additional
> complexity. Generally a runner will parallelize work automatically
> splitting the source the data is coming from so that each worker is busy,
> for example they may use signals such as CPU utilization to either increase
> the number of bundles of work that are processed in parallel per worker.
> This allows people to write DoFns that are as simple as possible and not
> have to worry about how to make them run faster.
>
> But there are practical considerations since work can only be split down
> to how many elements there are and also remote calls may be much more
> efficient if done in batches but this is a case by case basis and different
> strategies work better for different problems.
>
> Also, as a clarifying point, is it that each generateAndStoreGraphData() ,
> generateAndStoreTimeSerieseData1() , generateAndStoreTimeSerieseData1()
> need to be called for the same T in parallel (because of some internal
> requirements) or is it good enough that different T's go through the
> generateAndStore... in parallel?
>
> On Thu, May 5, 2016 at 11:09 AM, kaniska Mandal <ka...@gmail.com>
> wrote:
>
>> Inside a CompositeTransformation --
>> >  is it Ok to  spawn threads / use CountDownLatch to perform multiple
>> ParDo on the same data item at the same item
>> > or all the calls to individual ParDo inherently parallelized
>>
>> For example, I need to execute  generateAndStoreGraphData() ,
>> enerateAndStoreTimeSerieseData1() ,  generateAndStoreTimeSerieseData1()*
>>   -- in parallel .*
>>
>> static class MultiOps extends PTransform<PCollection<T>, PCollection<T>>
>> {
>>
>>     public PCollection<T> apply(PCollection<T> dataSet) {
>>
>>      dataSet.apply(ParDo.of( generateAndStoreGraphData());
>>
>>      dataSet.apply(ParDo.of( generateAndStoreTimeSerieseData1());
>>      dataSet.apply(ParDo.of( generateAndStoreTimeSerieseData1());
>>
>>       return results;
>>
>>     }
>>
>>   }
>>
>> =====================
>>
>> Thanks
>>
>> Kaniska
>>
>
>

Re: Whats the recommended approach to run parallel operations inside ParDo ?

Posted by Lukasz Cwik <lc...@google.com>.
How the work is parallelized is dependent on the runner but you should
strive to keep your DoFns as simple as possible and try to avoid additional
complexity. Generally a runner will parallelize work automatically
splitting the source the data is coming from so that each worker is busy,
for example they may use signals such as CPU utilization to either increase
the number of bundles of work that are processed in parallel per worker.
This allows people to write DoFns that are as simple as possible and not
have to worry about how to make them run faster.

But there are practical considerations since work can only be split down to
how many elements there are and also remote calls may be much more
efficient if done in batches but this is a case by case basis and different
strategies work better for different problems.

Also, as a clarifying point, is it that each generateAndStoreGraphData() ,
generateAndStoreTimeSerieseData1() , generateAndStoreTimeSerieseData1()
need to be called for the same T in parallel (because of some internal
requirements) or is it good enough that different T's go through the
generateAndStore... in parallel?

On Thu, May 5, 2016 at 11:09 AM, kaniska Mandal <ka...@gmail.com>
wrote:

> Inside a CompositeTransformation --
> >  is it Ok to  spawn threads / use CountDownLatch to perform multiple
> ParDo on the same data item at the same item
> > or all the calls to individual ParDo inherently parallelized
>
> For example, I need to execute  generateAndStoreGraphData() ,
> enerateAndStoreTimeSerieseData1() ,  generateAndStoreTimeSerieseData1()*
>   -- in parallel .*
>
> static class MultiOps extends PTransform<PCollection<T>, PCollection<T>> {
>
>     public PCollection<T> apply(PCollection<T> dataSet) {
>
>      dataSet.apply(ParDo.of( generateAndStoreGraphData());
>
>      dataSet.apply(ParDo.of( generateAndStoreTimeSerieseData1());
>      dataSet.apply(ParDo.of( generateAndStoreTimeSerieseData1());
>
>       return results;
>
>     }
>
>   }
>
> =====================
>
> Thanks
>
> Kaniska
>