You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ashwin Ramaswami (Jira)" <ji...@apache.org> on 2020/05/27 16:07:00 UTC
[jira] [Created] (BEAM-10111) Create methods in fileio to read from
/ write to archive files
Ashwin Ramaswami created BEAM-10111:
---------------------------------------
Summary: Create methods in fileio to read from / write to archive files
Key: BEAM-10111
URL: https://issues.apache.org/jira/browse/BEAM-10111
Project: Beam
Issue Type: Improvement
Components: io-py-files
Reporter: Ashwin Ramaswami
Assignee: Ashwin Ramaswami
It would be good to be able to read from / write to archive files (.zip, .tar) using fileio. The difference between this proposal and what we already have with CompressionTypes is that this would allow converting one file -> multiple files and vice versa. Here's how it might look like:
*Reading all contents from archive files:*
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/*.zip')
| fileio.Extract()
| fileio.MatchAll()
| fileio.ReadMatches()
| beam.Map(lambda x: (x.metadata.path, x.metadata._parent_archive_paths, x.read_utf8()))
)
{code}
`._parent_archive_paths` will then be equal to an array with the path of the parent zip file (it's an array because we could conceivably nest this by looking for archives within archives)
*Nested archive example:* (look for "`*`" inside of "`*.tar`" inside of "`*.zip`")
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/*.zip')
| fileio.Extract()
| fileio.MatchAll('*.tar')
| fileio.Extract()
| fileio.MatchAll() # gets all entries
| fileio.ReadMatches()
| beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
)
{code}
Note that in this case, this would involve modifying MatchAll() to take an argument, which would filter the files in the pcollection in the earlier stage of the pipeline.
*Reading from archive files and explicitly specifying the archive type (when it can't be inferred by the file extension):*
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/archive')
| fileio.Extract(, archivesystem=ArchiveSystem.TAR)
| fileio.MatchAll(archive_path='*.txt')
| fileio.ReadMatches()
| beam.Map(lambda x: (x.metadata.path, x.read_utf8()))
)
{code}
`ArchiveSystem` would be a generic class, just like `FileSystem`, which would allow for different implementations of methods such as `list()` and `extract()`. It would be implemented for .zip, .tar, etc.
*Writing multiple files to an archive file:*
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/files/*.txt')
| fileio.ReadMatches()
| fileio.Compress(archivesystem=ArchiveSystem.ZIP)
| textio.WriteToText("output.zip")
)
{code}
*Writing to a .tar.gz file:*
{code:python}
files = (
p
| fileio.MatchFiles('hdfs://path/to/files/*.txt')
| fileio.ReadMatches()
| fileio.Compress(archivesystem=ArchiveSystem.TAR)
| textio.WriteToText("output.tar.gz")
)
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)