You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pawel Szczur <pa...@gmail.com> on 2016/05/29 23:38:54 UTC

TextIO with .withoutSharding() writing a only one shard of data and ignoring the rest

Hi,

I'm running a pipeline with Flink backend, Beam bleeding edge, Oracle Java
1.8, maven 3.3.3, linux64.

The pipeline is run with --parallelism=6.

Adding .withoutSharding() causes a TextIO sink to write only one of the
shards.

Example use:
data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats"));
vs.
data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")
*.withoutSharding()*);

Result:
Only part of data is written to file. After comparing to sharded output, it
seems to be just one of shard files.

Cheers,
Pawel

Re: TextIO with .withoutSharding() writing a only one shard of data and ignoring the rest

Posted by Pawel Szczur <pa...@gmail.com>.
I've opened an issue:
https://issues.apache.org/jira/browse/BEAM-365

2016-06-07 11:20 GMT+02:00 Aljoscha Krettek <al...@apache.org>:

> At least for Flink a solution could be to set the parallelism of the
> "WriteBundles" ParDo to the number of shards. Then the behavior would be
> correct. Problem is just that the Flink runner (or any runner) doesn't know
> about the special nature of these three ParDo functions.
>
> On Tue, 7 Jun 2016 at 01:16 Robert Bradshaw <ro...@google.com> wrote:
>
>> A preferable solution would be to augment the Beam sinks to support
>> these parameters. At the very least, we should probably make running
>> these with fixed shards a loud error in the meantime.
>>
>> On Wed, Jun 1, 2016 at 4:17 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>> > Hi,
>> > the issue is a bit more complicated and involves the Beam sink API and
>> the
>> > Flink runner.
>> >
>> > I'll have to get a bit into how Beam sinks work. The base class for
>> sinks is
>> > Sink (TextIO.write gets translated to Write.to(new TextSink())).
>> Write.to
>> > normally gets translated to three ParDo operations that cooperate to do
>> the
>> > writing:
>> >
>> >  - "Initialize": this does initial initialization of the Sink, this is
>> run
>> > only once, per sink, non-parallel.
>> >
>> >  - "WriteBundles": this gets an initialized sink on a side-input and the
>> > values to write on the main input. This runs in parallel, so for Flink,
>> if
>> > you set parallelism=6 you'll get 6 parallel instances of this operation
>> at
>> > runtime. This operation forwards information about where it writes to
>> > downstream. This does not write to the final file location but an
>> > intermediate staging location.
>> >
>> >  - "Finalize": This gets the initialized sink on the main-input and and
>> the
>> > information about written files from "WriteBundles" as a side-input.
>> This
>> > also only runs once, non-parallel. Here we're writing the intermediate
>> files
>> > to a final location based on the sharding template.
>> >
>> > The problem is that Write.to() and TextSink, as well as all other
>> sinks, are
>> > not aware of the number of shards. If you set "withoutSharding()" this
>> will
>> > set the shard template to "" (empty string) and the number of shards to
>> 1.
>> > "WriteBundles", however is not aware of this and will write 6
>> intermediate
>> > files if you set parallelism=6. In "Finalize" we will copy an
>> intermediate
>> > file to the same final location 6 times based on the sharding template.
>> The
>> > end result is that you only get one of the six result shards.
>> >
>> > The reason why this does only occur in the Flink runner is that all
>> other
>> > runners have special overrides for TextIO.Write and AvroIO.Write that
>> kick
>> > in if sharding control is required. So, for the time being this is a
>> Flink
>> > runner bug and we might have to introduce special overrides as well
>> until
>> > this is solved in the general case.
>> >
>> > Cheers,
>> > Aljoscha
>> >
>> > On Wed, 1 Jun 2016 at 07:37 Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>> >>
>> >> Yes, just tested, it happens only with the flink runner.
>> >>
>> >> Agree to create a Jira.
>> >>
>> >> Regards
>> >> JB
>> >>
>> >> On 06/01/2016 03:41 AM, Davor Bonaci wrote:
>> >> > This will be a runner-specific issue. It would be the best to file a
>> >> > JIRA issue for this.
>> >> >
>> >> > On Tue, May 31, 2016 at 9:46 AM, Jean-Baptiste Onofré <
>> jb@nanthrax.net
>> >> > <ma...@nanthrax.net>> wrote:
>> >> >
>> >> >     Hi Pawel,
>> >> >
>> >> >     does it happen only with the Flink runner ? I bet it happens with
>> >> >     any runner.
>> >> >
>> >> >     Let me take a look.
>> >> >
>> >> >     Regards
>> >> >     JB
>> >> >
>> >> >     On 05/30/2016 01:38 AM, Pawel Szczur wrote:
>> >> >
>> >> >         Hi,
>> >> >
>> >> >         I'm running a pipeline with Flink backend, Beam bleeding
>> edge,
>> >> >         Oracle
>> >> >         Java 1.8, maven 3.3.3, linux64.
>> >> >
>> >> >         The pipeline is run with --parallelism=6.
>> >> >
>> >> >         Adding .withoutSharding()causes a TextIO sink to write only
>> one
>> >> >         of the
>> >> >         shards.
>> >> >
>> >> >         Example use:
>> >> >
>> >> >
>> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats"));
>> >> >         vs.
>> >> >
>> >> >
>> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")*.withoutSharding()*);
>> >> >
>> >> >         Result:
>> >> >         Only part of data is written to file. After comparing to
>> sharded
>> >> >         output,
>> >> >         it seems to be just one of shard files.
>> >> >
>> >> >         Cheers,
>> >> >         Pawel
>> >> >
>> >> >
>> >> >     --
>> >> >     Jean-Baptiste Onofré
>> >> >     jbonofre@apache.org <ma...@apache.org>
>> >> >     http://blog.nanthrax.net
>> >> >     Talend - http://www.talend.com
>> >> >
>> >> >
>> >>
>> >> --
>> >> Jean-Baptiste Onofré
>> >> jbonofre@apache.org
>> >> http://blog.nanthrax.net
>> >> Talend - http://www.talend.com
>>
>

Re: TextIO with .withoutSharding() writing a only one shard of data and ignoring the rest

Posted by Aljoscha Krettek <al...@apache.org>.
At least for Flink a solution could be to set the parallelism of the
"WriteBundles" ParDo to the number of shards. Then the behavior would be
correct. Problem is just that the Flink runner (or any runner) doesn't know
about the special nature of these three ParDo functions.

On Tue, 7 Jun 2016 at 01:16 Robert Bradshaw <ro...@google.com> wrote:

> A preferable solution would be to augment the Beam sinks to support
> these parameters. At the very least, we should probably make running
> these with fixed shards a loud error in the meantime.
>
> On Wed, Jun 1, 2016 at 4:17 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
> > Hi,
> > the issue is a bit more complicated and involves the Beam sink API and
> the
> > Flink runner.
> >
> > I'll have to get a bit into how Beam sinks work. The base class for
> sinks is
> > Sink (TextIO.write gets translated to Write.to(new TextSink())). Write.to
> > normally gets translated to three ParDo operations that cooperate to do
> the
> > writing:
> >
> >  - "Initialize": this does initial initialization of the Sink, this is
> run
> > only once, per sink, non-parallel.
> >
> >  - "WriteBundles": this gets an initialized sink on a side-input and the
> > values to write on the main input. This runs in parallel, so for Flink,
> if
> > you set parallelism=6 you'll get 6 parallel instances of this operation
> at
> > runtime. This operation forwards information about where it writes to
> > downstream. This does not write to the final file location but an
> > intermediate staging location.
> >
> >  - "Finalize": This gets the initialized sink on the main-input and and
> the
> > information about written files from "WriteBundles" as a side-input. This
> > also only runs once, non-parallel. Here we're writing the intermediate
> files
> > to a final location based on the sharding template.
> >
> > The problem is that Write.to() and TextSink, as well as all other sinks,
> are
> > not aware of the number of shards. If you set "withoutSharding()" this
> will
> > set the shard template to "" (empty string) and the number of shards to
> 1.
> > "WriteBundles", however is not aware of this and will write 6
> intermediate
> > files if you set parallelism=6. In "Finalize" we will copy an
> intermediate
> > file to the same final location 6 times based on the sharding template.
> The
> > end result is that you only get one of the six result shards.
> >
> > The reason why this does only occur in the Flink runner is that all other
> > runners have special overrides for TextIO.Write and AvroIO.Write that
> kick
> > in if sharding control is required. So, for the time being this is a
> Flink
> > runner bug and we might have to introduce special overrides as well until
> > this is solved in the general case.
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 1 Jun 2016 at 07:37 Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
> >>
> >> Yes, just tested, it happens only with the flink runner.
> >>
> >> Agree to create a Jira.
> >>
> >> Regards
> >> JB
> >>
> >> On 06/01/2016 03:41 AM, Davor Bonaci wrote:
> >> > This will be a runner-specific issue. It would be the best to file a
> >> > JIRA issue for this.
> >> >
> >> > On Tue, May 31, 2016 at 9:46 AM, Jean-Baptiste Onofré <
> jb@nanthrax.net
> >> > <ma...@nanthrax.net>> wrote:
> >> >
> >> >     Hi Pawel,
> >> >
> >> >     does it happen only with the Flink runner ? I bet it happens with
> >> >     any runner.
> >> >
> >> >     Let me take a look.
> >> >
> >> >     Regards
> >> >     JB
> >> >
> >> >     On 05/30/2016 01:38 AM, Pawel Szczur wrote:
> >> >
> >> >         Hi,
> >> >
> >> >         I'm running a pipeline with Flink backend, Beam bleeding edge,
> >> >         Oracle
> >> >         Java 1.8, maven 3.3.3, linux64.
> >> >
> >> >         The pipeline is run with --parallelism=6.
> >> >
> >> >         Adding .withoutSharding()causes a TextIO sink to write only
> one
> >> >         of the
> >> >         shards.
> >> >
> >> >         Example use:
> >> >
> >> >
> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats"));
> >> >         vs.
> >> >
> >> >
> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")*.withoutSharding()*);
> >> >
> >> >         Result:
> >> >         Only part of data is written to file. After comparing to
> sharded
> >> >         output,
> >> >         it seems to be just one of shard files.
> >> >
> >> >         Cheers,
> >> >         Pawel
> >> >
> >> >
> >> >     --
> >> >     Jean-Baptiste Onofré
> >> >     jbonofre@apache.org <ma...@apache.org>
> >> >     http://blog.nanthrax.net
> >> >     Talend - http://www.talend.com
> >> >
> >> >
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbonofre@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
>

Re: TextIO with .withoutSharding() writing a only one shard of data and ignoring the rest

Posted by Robert Bradshaw <ro...@google.com>.
A preferable solution would be to augment the Beam sinks to support
these parameters. At the very least, we should probably make running
these with fixed shards a loud error in the meantime.

On Wed, Jun 1, 2016 at 4:17 AM, Aljoscha Krettek <al...@apache.org> wrote:
> Hi,
> the issue is a bit more complicated and involves the Beam sink API and the
> Flink runner.
>
> I'll have to get a bit into how Beam sinks work. The base class for sinks is
> Sink (TextIO.write gets translated to Write.to(new TextSink())). Write.to
> normally gets translated to three ParDo operations that cooperate to do the
> writing:
>
>  - "Initialize": this does initial initialization of the Sink, this is run
> only once, per sink, non-parallel.
>
>  - "WriteBundles": this gets an initialized sink on a side-input and the
> values to write on the main input. This runs in parallel, so for Flink, if
> you set parallelism=6 you'll get 6 parallel instances of this operation at
> runtime. This operation forwards information about where it writes to
> downstream. This does not write to the final file location but an
> intermediate staging location.
>
>  - "Finalize": This gets the initialized sink on the main-input and and the
> information about written files from "WriteBundles" as a side-input. This
> also only runs once, non-parallel. Here we're writing the intermediate files
> to a final location based on the sharding template.
>
> The problem is that Write.to() and TextSink, as well as all other sinks, are
> not aware of the number of shards. If you set "withoutSharding()" this will
> set the shard template to "" (empty string) and the number of shards to 1.
> "WriteBundles", however is not aware of this and will write 6 intermediate
> files if you set parallelism=6. In "Finalize" we will copy an intermediate
> file to the same final location 6 times based on the sharding template. The
> end result is that you only get one of the six result shards.
>
> The reason why this does only occur in the Flink runner is that all other
> runners have special overrides for TextIO.Write and AvroIO.Write that kick
> in if sharding control is required. So, for the time being this is a Flink
> runner bug and we might have to introduce special overrides as well until
> this is solved in the general case.
>
> Cheers,
> Aljoscha
>
> On Wed, 1 Jun 2016 at 07:37 Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>>
>> Yes, just tested, it happens only with the flink runner.
>>
>> Agree to create a Jira.
>>
>> Regards
>> JB
>>
>> On 06/01/2016 03:41 AM, Davor Bonaci wrote:
>> > This will be a runner-specific issue. It would be the best to file a
>> > JIRA issue for this.
>> >
>> > On Tue, May 31, 2016 at 9:46 AM, Jean-Baptiste Onofré <jb@nanthrax.net
>> > <ma...@nanthrax.net>> wrote:
>> >
>> >     Hi Pawel,
>> >
>> >     does it happen only with the Flink runner ? I bet it happens with
>> >     any runner.
>> >
>> >     Let me take a look.
>> >
>> >     Regards
>> >     JB
>> >
>> >     On 05/30/2016 01:38 AM, Pawel Szczur wrote:
>> >
>> >         Hi,
>> >
>> >         I'm running a pipeline with Flink backend, Beam bleeding edge,
>> >         Oracle
>> >         Java 1.8, maven 3.3.3, linux64.
>> >
>> >         The pipeline is run with --parallelism=6.
>> >
>> >         Adding .withoutSharding()causes a TextIO sink to write only one
>> >         of the
>> >         shards.
>> >
>> >         Example use:
>> >
>> > data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats"));
>> >         vs.
>> >
>> > data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")*.withoutSharding()*);
>> >
>> >         Result:
>> >         Only part of data is written to file. After comparing to sharded
>> >         output,
>> >         it seems to be just one of shard files.
>> >
>> >         Cheers,
>> >         Pawel
>> >
>> >
>> >     --
>> >     Jean-Baptiste Onofré
>> >     jbonofre@apache.org <ma...@apache.org>
>> >     http://blog.nanthrax.net
>> >     Talend - http://www.talend.com
>> >
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com

Re: TextIO with .withoutSharding() writing a only one shard of data and ignoring the rest

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
the issue is a bit more complicated and involves the Beam sink API and the
Flink runner.

I'll have to get a bit into how Beam sinks work. The base class for sinks
is Sink (TextIO.write gets translated to Write.to(new TextSink())).
Write.to normally gets translated to three ParDo operations that cooperate
to do the writing:

 - "Initialize": this does initial initialization of the Sink, this is run
only once, per sink, non-parallel.

 - "WriteBundles": this gets an initialized sink on a side-input and the
values to write on the main input. This runs in parallel, so for Flink, if
you set parallelism=6 you'll get 6 parallel instances of this operation at
runtime. This operation forwards information about where it writes to
downstream. This does not write to the final file location but an
intermediate staging location.

 - "Finalize": This gets the initialized sink on the main-input and and the
information about written files from "WriteBundles" as a side-input. This
also only runs once, non-parallel. Here we're writing the intermediate
files to a final location based on the sharding template.

The problem is that Write.to() and TextSink, as well as all other sinks,
are not aware of the number of shards. If you set "withoutSharding()" this
will set the shard template to "" (empty string) and the number of shards
to 1. "WriteBundles", however is not aware of this and will write 6
intermediate files if you set parallelism=6. In "Finalize" we will copy an
intermediate file to the same final location 6 times based on the sharding
template. The end result is that you only get one of the six result shards.

The reason why this does only occur in the Flink runner is that all other
runners have special overrides for TextIO.Write and AvroIO.Write that kick
in if sharding control is required. So, for the time being this is a Flink
runner bug and we might have to introduce special overrides as well until
this is solved in the general case.

Cheers,
Aljoscha

On Wed, 1 Jun 2016 at 07:37 Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:

> Yes, just tested, it happens only with the flink runner.
>
> Agree to create a Jira.
>
> Regards
> JB
>
> On 06/01/2016 03:41 AM, Davor Bonaci wrote:
> > This will be a runner-specific issue. It would be the best to file a
> > JIRA issue for this.
> >
> > On Tue, May 31, 2016 at 9:46 AM, Jean-Baptiste Onofré <jb@nanthrax.net
> > <ma...@nanthrax.net>> wrote:
> >
> >     Hi Pawel,
> >
> >     does it happen only with the Flink runner ? I bet it happens with
> >     any runner.
> >
> >     Let me take a look.
> >
> >     Regards
> >     JB
> >
> >     On 05/30/2016 01:38 AM, Pawel Szczur wrote:
> >
> >         Hi,
> >
> >         I'm running a pipeline with Flink backend, Beam bleeding edge,
> >         Oracle
> >         Java 1.8, maven 3.3.3, linux64.
> >
> >         The pipeline is run with --parallelism=6.
> >
> >         Adding .withoutSharding()causes a TextIO sink to write only one
> >         of the
> >         shards.
> >
> >         Example use:
> >
>  data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats"));
> >         vs.
> >
>  data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")*.withoutSharding()*);
> >
> >         Result:
> >         Only part of data is written to file. After comparing to sharded
> >         output,
> >         it seems to be just one of shard files.
> >
> >         Cheers,
> >         Pawel
> >
> >
> >     --
> >     Jean-Baptiste Onofré
> >     jbonofre@apache.org <ma...@apache.org>
> >     http://blog.nanthrax.net
> >     Talend - http://www.talend.com
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: TextIO with .withoutSharding() writing a only one shard of data and ignoring the rest

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Yes, just tested, it happens only with the flink runner.

Agree to create a Jira.

Regards
JB

On 06/01/2016 03:41 AM, Davor Bonaci wrote:
> This will be a runner-specific issue. It would be the best to file a
> JIRA issue for this.
>
> On Tue, May 31, 2016 at 9:46 AM, Jean-Baptiste Onofr� <jb@nanthrax.net
> <ma...@nanthrax.net>> wrote:
>
>     Hi Pawel,
>
>     does it happen only with the Flink runner ? I bet it happens with
>     any runner.
>
>     Let me take a look.
>
>     Regards
>     JB
>
>     On 05/30/2016 01:38 AM, Pawel Szczur wrote:
>
>         Hi,
>
>         I'm running a pipeline with Flink backend, Beam bleeding edge,
>         Oracle
>         Java 1.8, maven 3.3.3, linux64.
>
>         The pipeline is run with --parallelism=6.
>
>         Adding .withoutSharding()causes a TextIO sink to write only one
>         of the
>         shards.
>
>         Example use:
>         data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats"));
>         vs.
>         data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")*.withoutSharding()*);
>
>         Result:
>         Only part of data is written to file. After comparing to sharded
>         output,
>         it seems to be just one of shard files.
>
>         Cheers,
>         Pawel
>
>
>     --
>     Jean-Baptiste Onofr�
>     jbonofre@apache.org <ma...@apache.org>
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: TextIO with .withoutSharding() writing a only one shard of data and ignoring the rest

Posted by Davor Bonaci <da...@google.com>.
This will be a runner-specific issue. It would be the best to file a JIRA
issue for this.

On Tue, May 31, 2016 at 9:46 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Pawel,
>
> does it happen only with the Flink runner ? I bet it happens with any
> runner.
>
> Let me take a look.
>
> Regards
> JB
>
> On 05/30/2016 01:38 AM, Pawel Szczur wrote:
>
>> Hi,
>>
>> I'm running a pipeline with Flink backend, Beam bleeding edge, Oracle
>> Java 1.8, maven 3.3.3, linux64.
>>
>> The pipeline is run with --parallelism=6.
>>
>> Adding .withoutSharding()causes a TextIO sink to write only one of the
>> shards.
>>
>> Example use:
>> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats"));
>> vs.
>>
>> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")*.withoutSharding()*);
>>
>> Result:
>> Only part of data is written to file. After comparing to sharded output,
>> it seems to be just one of shard files.
>>
>> Cheers,
>> Pawel
>>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: TextIO with .withoutSharding() writing a only one shard of data and ignoring the rest

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Pawel,

does it happen only with the Flink runner ? I bet it happens with any 
runner.

Let me take a look.

Regards
JB

On 05/30/2016 01:38 AM, Pawel Szczur wrote:
> Hi,
>
> I'm running a pipeline with Flink backend, Beam bleeding edge, Oracle
> Java 1.8, maven 3.3.3, linux64.
>
> The pipeline is run with --parallelism=6.
>
> Adding .withoutSharding()causes a TextIO sink to write only one of the
> shards.
>
> Example use:
> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats"));
> vs.
> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")*.withoutSharding()*);
>
> Result:
> Only part of data is written to file. After comparing to sharded output,
> it seems to be just one of shard files.
>
> Cheers,
> Pawel

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com