You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Jiayue Zhang (Bravo)" <mz...@gmail.com> on 2018/11/14 01:28:27 UTC

Multiple concurrent transforms with SparkRunner

Hi,

I'm writing Java Beam code to run with both Dataflow and Spark. The input
files are tfrecord format and are from multiple directories. Java
TFRecordIO doesn't have readAll from list of files so what I'm doing is:

for (String dir: listOfDirs) {
    p.apply(TFRecordIO.read().from(dir))
     .apply(ParDo.of(new BatchElements()))
     .apply(ParDo.of(new Process()))
     .apply(Combine.globally(new CombineResult()))
     .apply(TextIO.write().to(dir))
}

These directories are fairly independent and I only need result of each
directory. When running on Dataflow, processing of these directories happen
concurrently. But when running with Spark, I saw the spark jobs and stages
are sequential. It needs finish all steps in one directory before moving to
next one. What's the way to make multiple transforms run concurrently with
SparkRunner?

Re: Multiple concurrent transforms with SparkRunner

Posted by "Jiayue Zhang (Bravo)" <mz...@gmail.com>.
Hi Thomas,

Thanks for your response. Scheduler mode doesn't help here because Spark
only knows it has 1 active job at a time. I wonder if there are ways, like
spinning up some threads or providing a custom SparkContext, to make
multiple Spark jobs active at the same time.

On Wed, Nov 14, 2018 at 4:18 AM Thomas Fion <tf...@talend.com> wrote:

> Hi Juan,
>
> Try to play with the spark.scheduler.mode property (setting it to "FAIR"
> for instance).
> By default, Spark runs its jobs/stages in FIFO mode, meaning that in your
> case, if it can allocate all workers on writing to one of the directory it
> will perform one directory after the other.
>
> Brgds
>
> Juan Carlos Garcia a écrit le 14/11/2018 à 07:18 :
>
> I suggest to play around with some spark configurations like: dynamic
> execution parameters
>
> https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_latest_configuration.html-23dynamic-2Dallocation&d=DwMFaQ&c=2w5q_42kFG40MI2alLPgJw&r=Zu9EQzNX_-gUT4k7XupYfw&m=JTCYIc9exlGGYLmLjSM2kQF8yyttDBORBbFPGDg7W-s&s=8J3DeL0js9wc1XVs61z3VCbLtEuCYwrRcqXAY4eOFBk&e=>
>
>
>
> Am Mi., 14. Nov. 2018, 02:28 hat Jiayue Zhang (Bravo) <
> mzhang1230@gmail.com> geschrieben:
>
>> Hi,
>>
>> I'm writing Java Beam code to run with both Dataflow and Spark. The input
>> files are tfrecord format and are from multiple directories. Java
>> TFRecordIO doesn't have readAll from list of files so what I'm doing is:
>>
>> for (String dir: listOfDirs) {
>>     p.apply(TFRecordIO.read().from(dir))
>>      .apply(ParDo.of(new BatchElements()))
>>      .apply(ParDo.of(new Process()))
>>      .apply(Combine.globally(new CombineResult()))
>>      .apply(TextIO.write().to(dir))
>> }
>>
>> These directories are fairly independent and I only need result of each
>> directory. When running on Dataflow, processing of these directories happen
>> concurrently. But when running with Spark, I saw the spark jobs and stages
>> are sequential. It needs finish all steps in one directory before moving to
>> next one. What's the way to make multiple transforms run concurrently with
>> SparkRunner?
>>
>
> As a recipient of an email from Talend, your contact personal data will be
> on our systems. Please see our contacts privacy notice at Talend, Inc.
> <https://www.talend.com/contacts-privacy-policy/>
>
>
>

Re: Multiple concurrent transforms with SparkRunner

Posted by Thomas Fion <tf...@talend.com>.
Hi Juan,

Try to play with the spark.scheduler.mode property (setting it to "FAIR" for instance).
By default, Spark runs its jobs/stages in FIFO mode, meaning that in your case, if it can allocate all workers on writing to one of the directory it will perform one directory after the other.

Brgds

Juan Carlos Garcia a écrit le 14/11/2018 à 07:18 :
I suggest to play around with some spark configurations like: dynamic execution parameters

https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation<https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_latest_configuration.html-23dynamic-2Dallocation&d=DwMFaQ&c=2w5q_42kFG40MI2alLPgJw&r=Zu9EQzNX_-gUT4k7XupYfw&m=JTCYIc9exlGGYLmLjSM2kQF8yyttDBORBbFPGDg7W-s&s=8J3DeL0js9wc1XVs61z3VCbLtEuCYwrRcqXAY4eOFBk&e=>



Am Mi., 14. Nov. 2018, 02:28 hat Jiayue Zhang (Bravo) <mz...@gmail.com>> geschrieben:
Hi,

I'm writing Java Beam code to run with both Dataflow and Spark. The input files are tfrecord format and are from multiple directories. Java TFRecordIO doesn't have readAll from list of files so what I'm doing is:

for (String dir: listOfDirs) {
    p.apply(TFRecordIO.read().from(dir))
     .apply(ParDo.of(new BatchElements()))
     .apply(ParDo.of(new Process()))
     .apply(Combine.globally(new CombineResult()))
     .apply(TextIO.write().to(dir))
}

These directories are fairly independent and I only need result of each directory. When running on Dataflow, processing of these directories happen concurrently. But when running with Spark, I saw the spark jobs and stages are sequential. It needs finish all steps in one directory before moving to next one. What's the way to make multiple transforms run concurrently with SparkRunner?


As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our contacts privacy notice at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>



Re: Multiple concurrent transforms with SparkRunner

Posted by "Jiayue Zhang (Bravo)" <mz...@gmail.com>.
Hi Juan,

Thanks for your response. Dynamic allocation is already enabled. It's not
about resource allocation but how Spark schedule jobs and stages.

On Tue, Nov 13, 2018 at 10:19 PM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> I suggest to play around with some spark configurations like: dynamic
> execution parameters
>
> https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation
>
>
>
> Am Mi., 14. Nov. 2018, 02:28 hat Jiayue Zhang (Bravo) <
> mzhang1230@gmail.com> geschrieben:
>
>> Hi,
>>
>> I'm writing Java Beam code to run with both Dataflow and Spark. The input
>> files are tfrecord format and are from multiple directories. Java
>> TFRecordIO doesn't have readAll from list of files so what I'm doing is:
>>
>> for (String dir: listOfDirs) {
>>     p.apply(TFRecordIO.read().from(dir))
>>      .apply(ParDo.of(new BatchElements()))
>>      .apply(ParDo.of(new Process()))
>>      .apply(Combine.globally(new CombineResult()))
>>      .apply(TextIO.write().to(dir))
>> }
>>
>> These directories are fairly independent and I only need result of each
>> directory. When running on Dataflow, processing of these directories happen
>> concurrently. But when running with Spark, I saw the spark jobs and stages
>> are sequential. It needs finish all steps in one directory before moving to
>> next one. What's the way to make multiple transforms run concurrently with
>> SparkRunner?
>>
>

Re: Multiple concurrent transforms with SparkRunner

Posted by Juan Carlos Garcia <jc...@gmail.com>.
I suggest to play around with some spark configurations like: dynamic
execution parameters

https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation



Am Mi., 14. Nov. 2018, 02:28 hat Jiayue Zhang (Bravo) <mz...@gmail.com>
geschrieben:

> Hi,
>
> I'm writing Java Beam code to run with both Dataflow and Spark. The input
> files are tfrecord format and are from multiple directories. Java
> TFRecordIO doesn't have readAll from list of files so what I'm doing is:
>
> for (String dir: listOfDirs) {
>     p.apply(TFRecordIO.read().from(dir))
>      .apply(ParDo.of(new BatchElements()))
>      .apply(ParDo.of(new Process()))
>      .apply(Combine.globally(new CombineResult()))
>      .apply(TextIO.write().to(dir))
> }
>
> These directories are fairly independent and I only need result of each
> directory. When running on Dataflow, processing of these directories happen
> concurrently. But when running with Spark, I saw the spark jobs and stages
> are sequential. It needs finish all steps in one directory before moving to
> next one. What's the way to make multiple transforms run concurrently with
> SparkRunner?
>