You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Chris Hebert <ch...@digitalreasoning.com> on 2017/08/02 15:02:30 UTC

Un-parallelized TextIO to HDFS on Beam+Flink+YARN

Hi,

I ran a Beam+Flink+YARN job with many containers and a high "
parallelism.default" parameter in my conf/flink-conf.yaml file.

That all worked perfectly with all the containers parallelizing all the
parts of the job up until the very end at a TextIO.write().

The last Task running was a "DataSink
(org.apache.flink.api.java.io.DiscardingOutputFormat)" during which I could
finally actually observe output being written to HDFS. (The pipeline was in
batch-mode reading from the file system, so I'm not entirely shocked
writing output was saved until the end).

The problem is: This Task used only one TaskManger/Container and ran all by
itself for the last 15 minutes of the pipeline while all the other
TaskManagers/Containers sat idle.

How can I make sure that this gets parallelized the same as all the other
Tasks?

Should I address this through the Beam API or through some Flink
configuration parameter I haven't found yet?

Is it even possible to have multiple TaskManagers writing the TextIO output
to HDFS at the same time?

Thank you for your help,
Chris

Re: Un-parallelized TextIO to HDFS on Beam+Flink+YARN

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I think this might not be a problem. The reason we have this DatSink(DiscardingOutputFormat) at the "end" of Flink Batch pipelines is that Flink Batch will not execute a chain of operations when they're not terminated by a sink. In Beam, it's just fine to just have a DoFn and no sink after that because the DoFn can also write data to an external system. In fact, the TextIO.write() operation was implemented as a combination of several DoFns, last time I checked. It would assume that this "terminator" sink is just waiting until the DoFns that do the actual work are done writing. 

Could you maybe post the execution plan that you see in the Flink Dashboard? Either here or in private to me, if you don't want to share that on the ML.

Best,
Aljoscha

> On 2. Aug 2017, at 17:02, Chris Hebert <ch...@digitalreasoning.com> wrote:
> 
> Hi,
> 
> I ran a Beam+Flink+YARN job with many containers and a high "parallelism.default" parameter in my conf/flink-conf.yaml file.
> 
> That all worked perfectly with all the containers parallelizing all the parts of the job up until the very end at a TextIO.write().
> 
> The last Task running was a "DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat)" during which I could finally actually observe output being written to HDFS. (The pipeline was in batch-mode reading from the file system, so I'm not entirely shocked writing output was saved until the end).
> 
> The problem is: This Task used only one TaskManger/Container and ran all by itself for the last 15 minutes of the pipeline while all the other TaskManagers/Containers sat idle.
> 
> How can I make sure that this gets parallelized the same as all the other Tasks?
> 
> Should I address this through the Beam API or through some Flink configuration parameter I haven't found yet?
> 
> Is it even possible to have multiple TaskManagers writing the TextIO output to HDFS at the same time?
> 
> Thank you for your help,
> Chris