You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Pawel Szczur (JIRA)" <ji...@apache.org> on 2016/06/21 14:09:57 UTC

[jira] [Updated] (BEAM-365) TextIO withoutSharding causes Flink to throw IllegalStateException

     [ https://issues.apache.org/jira/browse/BEAM-365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Pawel Szczur updated BEAM-365:
------------------------------
    Description: 
The exception: 
{code}java.lang.IllegalStateException: Shard name template '' only generated 1 distinct file names for 3 files{code}

The initial discussion took place some time ago, the {{withoutSharding}} was then silently ignored by the runner.

Explanation from Aljoscha Krettek:
{quote}
Hi,
the issue is a bit more complicated and involves the Beam sink API and the
Flink runner.

I'll have to get a bit into how Beam sinks work. The base class for sinks
is Sink (TextIO.write gets translated to Write.to(new TextSink())).
Write.to normally gets translated to three ParDo operations that cooperate
to do the writing:

 - "Initialize": this does initial initialization of the Sink, this is run
only once, per sink, non-parallel.

 - "WriteBundles": this gets an initialized sink on a side-input and the
values to write on the main input. This runs in parallel, so for Flink, if
you set parallelism=6 you'll get 6 parallel instances of this operation at
runtime. This operation forwards information about where it writes to
downstream. This does not write to the final file location but an
intermediate staging location.

 - "Finalize": This gets the initialized sink on the main-input and and the
information about written files from "WriteBundles" as a side-input. This
also only runs once, non-parallel. Here we're writing the intermediate
files to a final location based on the sharding template.

The problem is that Write.to() and TextSink, as well as all other sinks,
are not aware of the number of shards. If you set "withoutSharding()" this
will set the shard template to "" (empty string) and the number of shards
to 1. "WriteBundles", however is not aware of this and will write 6
intermediate files if you set parallelism=6. In "Finalize" we will copy an
intermediate file to the same final location 6 times based on the sharding
template. The end result is that you only get one of the six result shards.

The reason why this does only occur in the Flink runner is that all other
runners have special overrides for TextIO.Write and AvroIO.Write that kick
in if sharding control is required. So, for the time being this is a Flink
runner bug and we might have to introduce special overrides as well until
this is solved in the general case.

Cheers,
Aljoscha
{quote}

Original discussion:
http://mail-archives.apache.org/mod_mbox/incubator-beam-user/201606.mbox/%3CCAMdX74-VPUsNOc9NKue2A2tYXZisnHNZ7UkPWk82_TFexpnySg%40mail.gmail.com%3E

  was:
The exception: 
{code}java.lang.IllegalStateException: Shard name template '' only generated 1 distinct file names for 3 files{code}

The initial discussion took place some time ago, the {{withoutSharding}} was then silently ignored.

Explanation from Aljoscha Krettek:
{quote}
Hi,
the issue is a bit more complicated and involves the Beam sink API and the
Flink runner.

I'll have to get a bit into how Beam sinks work. The base class for sinks
is Sink (TextIO.write gets translated to Write.to(new TextSink())).
Write.to normally gets translated to three ParDo operations that cooperate
to do the writing:

 - "Initialize": this does initial initialization of the Sink, this is run
only once, per sink, non-parallel.

 - "WriteBundles": this gets an initialized sink on a side-input and the
values to write on the main input. This runs in parallel, so for Flink, if
you set parallelism=6 you'll get 6 parallel instances of this operation at
runtime. This operation forwards information about where it writes to
downstream. This does not write to the final file location but an
intermediate staging location.

 - "Finalize": This gets the initialized sink on the main-input and and the
information about written files from "WriteBundles" as a side-input. This
also only runs once, non-parallel. Here we're writing the intermediate
files to a final location based on the sharding template.

The problem is that Write.to() and TextSink, as well as all other sinks,
are not aware of the number of shards. If you set "withoutSharding()" this
will set the shard template to "" (empty string) and the number of shards
to 1. "WriteBundles", however is not aware of this and will write 6
intermediate files if you set parallelism=6. In "Finalize" we will copy an
intermediate file to the same final location 6 times based on the sharding
template. The end result is that you only get one of the six result shards.

The reason why this does only occur in the Flink runner is that all other
runners have special overrides for TextIO.Write and AvroIO.Write that kick
in if sharding control is required. So, for the time being this is a Flink
runner bug and we might have to introduce special overrides as well until
this is solved in the general case.

Cheers,
Aljoscha
{quote}

Original discussion:
http://mail-archives.apache.org/mod_mbox/incubator-beam-user/201606.mbox/%3CCAMdX74-VPUsNOc9NKue2A2tYXZisnHNZ7UkPWk82_TFexpnySg%40mail.gmail.com%3E


> TextIO withoutSharding causes Flink to throw IllegalStateException
> ------------------------------------------------------------------
>
>                 Key: BEAM-365
>                 URL: https://issues.apache.org/jira/browse/BEAM-365
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 0.2.0-incubating
>            Reporter: Pawel Szczur
>
> The exception: 
> {code}java.lang.IllegalStateException: Shard name template '' only generated 1 distinct file names for 3 files{code}
> The initial discussion took place some time ago, the {{withoutSharding}} was then silently ignored by the runner.
> Explanation from Aljoscha Krettek:
> {quote}
> Hi,
> the issue is a bit more complicated and involves the Beam sink API and the
> Flink runner.
> I'll have to get a bit into how Beam sinks work. The base class for sinks
> is Sink (TextIO.write gets translated to Write.to(new TextSink())).
> Write.to normally gets translated to three ParDo operations that cooperate
> to do the writing:
>  - "Initialize": this does initial initialization of the Sink, this is run
> only once, per sink, non-parallel.
>  - "WriteBundles": this gets an initialized sink on a side-input and the
> values to write on the main input. This runs in parallel, so for Flink, if
> you set parallelism=6 you'll get 6 parallel instances of this operation at
> runtime. This operation forwards information about where it writes to
> downstream. This does not write to the final file location but an
> intermediate staging location.
>  - "Finalize": This gets the initialized sink on the main-input and and the
> information about written files from "WriteBundles" as a side-input. This
> also only runs once, non-parallel. Here we're writing the intermediate
> files to a final location based on the sharding template.
> The problem is that Write.to() and TextSink, as well as all other sinks,
> are not aware of the number of shards. If you set "withoutSharding()" this
> will set the shard template to "" (empty string) and the number of shards
> to 1. "WriteBundles", however is not aware of this and will write 6
> intermediate files if you set parallelism=6. In "Finalize" we will copy an
> intermediate file to the same final location 6 times based on the sharding
> template. The end result is that you only get one of the six result shards.
> The reason why this does only occur in the Flink runner is that all other
> runners have special overrides for TextIO.Write and AvroIO.Write that kick
> in if sharding control is required. So, for the time being this is a Flink
> runner bug and we might have to introduce special overrides as well until
> this is solved in the general case.
> Cheers,
> Aljoscha
> {quote}
> Original discussion:
> http://mail-archives.apache.org/mod_mbox/incubator-beam-user/201606.mbox/%3CCAMdX74-VPUsNOc9NKue2A2tYXZisnHNZ7UkPWk82_TFexpnySg%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)