You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Boyuan Zhang (Jira)" <ji...@apache.org> on 2021/03/30 20:37:00 UTC

[jira] [Commented] (BEAM-11995) Implement FileIO/TextIO on top of Splittable DoFn

    [ https://issues.apache.org/jira/browse/BEAM-11995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311797#comment-17311797 ] 

Boyuan Zhang commented on BEAM-11995:
-------------------------------------

h3.  Goal
Current [FileIO|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java] and [TextIO|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java] reads [FiledBasedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java] via [ReadAllViaFileBasedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java]. We want to turn ReadAllViaFileBasedSource into SDF implementation to gain benefits of dynamic split.

h3. Details
[ReadAllViaFileBasedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java] is a composite transform which expands into "Split into ranges" -> "Reshuffle" -> "Read ranges": [code|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java#L63-L67]. The "Read ranges" still uses [BoundedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java] and [BoundedReader|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L108] APIs to read from file. When converting ReadAllViaFileBasedSource transform into SDF implementation, we can still use BoundedSource and BoundedReader APIs to read context since these have got hooked up with multiple kind of files: [query|https://github.com/apache/beam/search?q=ReadAllViaFileBasedSource]. 

Another option is to build SDF read for every FileBasedSource: [AvroSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java], [CompressedSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java], [TextSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java] , [TFRecordSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L488]. The SDF should take a ReableFile as input and emit file context. Then we can replace all ReadAllViaFileBasedSource with this SDF implementation.

h3. Code examples
* Build SDF based on BoundedSource and BoundedReader API: [BoundedSourceAsSDFWrapperFn|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L261]
* Build SDF to replace BoundedSource/BoundedReader: [ParquetIO|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L716]

> Implement FileIO/TextIO on top of Splittable DoFn
> -------------------------------------------------
>
>                 Key: BEAM-11995
>                 URL: https://issues.apache.org/jira/browse/BEAM-11995
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files
>            Reporter: Boyuan Zhang
>            Priority: P2
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)