You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by "Kirby, Adam" <ak...@gmail.com> on 2022/07/20 00:13:43 UTC

Support for reading compressed ZIP stream of multiple CSVs

Hi All,

I'm currently using pyarrow.csv.read_csv to parse a CSV stream that
originates from a ZIP of multiple CSV files. For now, I'm using a separate
implementation to do the streaming ZIP decompression, then
using pyarrow.csv.read_csv at each CSV file boundary.

I would love if there were a way to leverage pyarrow to handle the
decompression. From what I've seen in examples, a ZIP file containing a
single CSV is supported -- that is, it's possible to operate on a
compressed CSV stream -- but I wonder if it's possible to handle a
compressed stream that contains multiple files?

Thank you in advance!

Re: Support for reading compressed ZIP stream of multiple CSVs

Posted by "Kirby, Adam" <ak...@gmail.com>.
As a quick update in case anyone runs across this, this has been fixed
upstream (9.0.0) by: https://github.com/apache/arrow/pull/12977

On Thu, Jul 21, 2022 at 3:18 PM Kirby, Adam <ak...@gmail.com> wrote:

> One more follow-up here. The addition of the below statement seems to coax
> out an error. Does it appear that the filenames aren't making their way to
> the routines that extract the fields from the filenames?
>
> FWIW, this error seems to be coming from here:
> https://github.com/apache/arrow/blob/6e3f26af658bfca602e711ea326f1985b62bca1d/cpp/src/arrow/dataset/partition.cc#L511
>
> partitioning = pds.FilenamePartitioning(schema=part_schema).discover(
> schema=part_schema)
> ds_partitioned = pds.dataset(
> csv_files, format=csvformat, filesystem=fsspec_fs, partitioning=
> partitioning,
> )
> # Traceback (most recent call last):
> # File "/zip_of_csvs_test.py", line 82, in <module>
> # ds_partitioned = pds.dataset(
> # File
> "/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py",
> line 697, in dataset
> # return _filesystem_dataset(source, **kwargs)
> # File
> "/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py",
> line 449, in _filesystem_dataset
> # return factory.finish(schema)
> # File "pyarrow/_dataset.pyx", line 1857, in
> pyarrow._dataset.DatasetFactory.finish
> # File "pyarrow/error.pxi", line 144, in
> pyarrow.lib.pyarrow_internal_check_status
> # File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
> # pyarrow.lib.ArrowInvalid: No non-null segments were available for field
> 'frequency'; couldn't infer type
>
>
>
> On Wed, Jul 20, 2022 at 11:19 PM Kirby, Adam <ak...@gmail.com> wrote:
>
>> As a follow-up, I can confirm that this appears to work very well for
>> non-partitioned data at least.
>> In my case, the data are ‘partitioned’ and while the rest of the data are
>> parsed properly, the partition fields don’t seem to be being extracted from
>> the filenames. Does it appear that I am doing something incorrectly?
>>
>> Thank you!
>>
>> —
>>
>> #!/usr/bin/env python3
>> import fsspecimport pyarrow as paimport pyarrow.csv as pcsvimport pyarrow.dataset as pds
>>
>> sample_file = (
>>     "https://firstratedata.com/_data/_deploy/stocks-complete_bundle_sample.zip"
>> )
>> schema = pa.schema(
>>     [
>>         pa.field("datetime", pa.timestamp("s")),
>>         pa.field("open", pa.float64()),
>>         pa.field("high", pa.float64()),
>>         pa.field("low", pa.float64()),
>>         pa.field("close", pa.float64()),
>>         pa.field("volume", pa.float64()),
>>     ],
>> )
>> read_opts, convert_opts = pcsv.ReadOptions(), pcsv.ConvertOptions()
>> convert_opts.column_types = schema
>> read_opts.column_names = schema.names
>> csvformat = pds.CsvFileFormat(convert_options=convert_opts, read_options=read_opts)
>>
>> fsspec_fs = fsspec.filesystem("zip", fo=fsspec.open(sample_file))
>>
>> csv_files = [_ for _ in fsspec_fs.ls("/") if _.endswith("_sample.txt")]
>> print(csv_files)# ['AAPL_1hour_sample.txt', 'AAPL_1min_sample.txt', 'AAPL_30min_sample.txt',# 'AAPL_5min_sample.txt', 'AMZN_1hour_sample.txt', 'AMZN_1min_sample.txt',# 'AMZN_30min_sample.txt', 'AMZN_5min_sample.txt', 'MSFT_1hour_sample.txt',# 'MSFT_1min_sample.txt', 'MSFT_30min_sample.txt', 'MSFT_5min_sample.txt']
>>
>> part_schema = pa.schema([("symbol", pa.string()), ("frequency", pa.string())])
>> partitioning = pds.FilenamePartitioning(schema=part_schema)
>> # confirm filenames are parsed correctly
>> print({_: str(partitioning.parse(_)) for _ in csv_files})# {#     "AAPL_1hour_sample.txt": '((symbol == "AAPL") and (frequency == "1hour"))',#     "AAPL_1min_sample.txt": '((symbol == "AAPL") and (frequency == "1min"))',#     "AAPL_30min_sample.txt": '((symbol == "AAPL") and (frequency == "30min"))',#     "AAPL_5min_sample.txt": '((symbol == "AAPL") and (frequency == "5min"))',#     "AMZN_1hour_sample.txt": '((symbol == "AMZN") and (frequency == "1hour"))',#     "AMZN_1min_sample.txt": '((symbol == "AMZN") and (frequency == "1min"))',#     "AMZN_30min_sample.txt": '((symbol == "AMZN") and (frequency == "30min"))',#     "AMZN_5min_sample.txt": '((symbol == "AMZN") and (frequency == "5min"))',#     "MSFT_1hour_sample.txt": '((symbol == "MSFT") and (frequency == "1hour"))',#     "MSFT_1min_sample.txt": '((symbol == "MSFT") and (frequency == "1min"))',#     "MSFT_30min_sample.txt": '((symbol == "MSFT") and (frequency == "30min"))',#     "MSFT_5min_sample.txt": '((symbol == "MSFT") and (frequency == "5min"))',# }
>>
>> ds_partitioned = pds.dataset(
>>     csv_files, format=csvformat, filesystem=fsspec_fs, partitioning=partitioning,
>> )
>>
>> print(ds_partitioned.head(5))# pyarrow.Table# datetime: timestamp[s]# open: double# high: double# low: double# close: double# volume: double# symbol: string# frequency: string# ----# datetime: [[2022-04-01 04:00:00,2022-04-01 05:00:00,2022-04-01 06:00:00,2022-04-01 07:00:00,2022-04-01 08:00:00]]# open: [[175.25,175.32,175.43,175.54,175.49]]# high: [[175.88,175.38,175.72,175.6,175.52]]# low: [[175.1,175.04,175.33,174.69,173.35]]# close: [[175.26,175.31,175.5,174.82,173.6]]# volume: [[24417,13692,90057,162983,736016]]# symbol: [[null,null,null,null,null]]# frequency: [[null,null,null,null,null]]
>>
>>
>> On Wed, Jul 20, 2022 at 11:12 AM Kirby, Adam <ak...@gmail.com> wrote:
>>
>>> Micah, Great idea, thank you! I really appreciate the pointer.
>>>
>>> On Wed, Jul 20, 2022 at 12:04 AM Micah Kornfield <em...@gmail.com>
>>> wrote:
>>>
>>>> You could maybe use datasets on top of fsspec's zip file system [1]?
>>>>
>>>> [1]
>>>> https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/zip.html
>>>>
>>>> On Tuesday, July 19, 2022, Kirby, Adam <ak...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I'm currently using pyarrow.csv.read_csv to parse a CSV stream that
>>>>> originates from a ZIP of multiple CSV files. For now, I'm using a separate
>>>>> implementation to do the streaming ZIP decompression, then
>>>>> using pyarrow.csv.read_csv at each CSV file boundary.
>>>>>
>>>>> I would love if there were a way to leverage pyarrow to handle the
>>>>> decompression. From what I've seen in examples, a ZIP file containing a
>>>>> single CSV is supported -- that is, it's possible to operate on a
>>>>> compressed CSV stream -- but I wonder if it's possible to handle a
>>>>> compressed stream that contains multiple files?
>>>>>
>>>>> Thank you in advance!
>>>>>
>>>>

Re: Support for reading compressed ZIP stream of multiple CSVs

Posted by "Kirby, Adam" <ak...@gmail.com>.
One more follow-up here. The addition of the below statement seems to coax
out an error. Does it appear that the filenames aren't making their way to
the routines that extract the fields from the filenames?

FWIW, this error seems to be coming from here:
https://github.com/apache/arrow/blob/6e3f26af658bfca602e711ea326f1985b62bca1d/cpp/src/arrow/dataset/partition.cc#L511

partitioning = pds.FilenamePartitioning(schema=part_schema).discover(schema=
part_schema)
ds_partitioned = pds.dataset(
csv_files, format=csvformat, filesystem=fsspec_fs, partitioning=partitioning
,
)
# Traceback (most recent call last):
# File "/zip_of_csvs_test.py", line 82, in <module>
# ds_partitioned = pds.dataset(
# File
"/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py",
line 697, in dataset
# return _filesystem_dataset(source, **kwargs)
# File
"/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py",
line 449, in _filesystem_dataset
# return factory.finish(schema)
# File "pyarrow/_dataset.pyx", line 1857, in
pyarrow._dataset.DatasetFactory.finish
# File "pyarrow/error.pxi", line 144, in
pyarrow.lib.pyarrow_internal_check_status
# File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
# pyarrow.lib.ArrowInvalid: No non-null segments were available for field
'frequency'; couldn't infer type



On Wed, Jul 20, 2022 at 11:19 PM Kirby, Adam <ak...@gmail.com> wrote:

> As a follow-up, I can confirm that this appears to work very well for
> non-partitioned data at least.
> In my case, the data are ‘partitioned’ and while the rest of the data are
> parsed properly, the partition fields don’t seem to be being extracted from
> the filenames. Does it appear that I am doing something incorrectly?
>
> Thank you!
>
> —
>
> #!/usr/bin/env python3
> import fsspecimport pyarrow as paimport pyarrow.csv as pcsvimport pyarrow.dataset as pds
>
> sample_file = (
>     "https://firstratedata.com/_data/_deploy/stocks-complete_bundle_sample.zip"
> )
> schema = pa.schema(
>     [
>         pa.field("datetime", pa.timestamp("s")),
>         pa.field("open", pa.float64()),
>         pa.field("high", pa.float64()),
>         pa.field("low", pa.float64()),
>         pa.field("close", pa.float64()),
>         pa.field("volume", pa.float64()),
>     ],
> )
> read_opts, convert_opts = pcsv.ReadOptions(), pcsv.ConvertOptions()
> convert_opts.column_types = schema
> read_opts.column_names = schema.names
> csvformat = pds.CsvFileFormat(convert_options=convert_opts, read_options=read_opts)
>
> fsspec_fs = fsspec.filesystem("zip", fo=fsspec.open(sample_file))
>
> csv_files = [_ for _ in fsspec_fs.ls("/") if _.endswith("_sample.txt")]
> print(csv_files)# ['AAPL_1hour_sample.txt', 'AAPL_1min_sample.txt', 'AAPL_30min_sample.txt',# 'AAPL_5min_sample.txt', 'AMZN_1hour_sample.txt', 'AMZN_1min_sample.txt',# 'AMZN_30min_sample.txt', 'AMZN_5min_sample.txt', 'MSFT_1hour_sample.txt',# 'MSFT_1min_sample.txt', 'MSFT_30min_sample.txt', 'MSFT_5min_sample.txt']
>
> part_schema = pa.schema([("symbol", pa.string()), ("frequency", pa.string())])
> partitioning = pds.FilenamePartitioning(schema=part_schema)
> # confirm filenames are parsed correctly
> print({_: str(partitioning.parse(_)) for _ in csv_files})# {#     "AAPL_1hour_sample.txt": '((symbol == "AAPL") and (frequency == "1hour"))',#     "AAPL_1min_sample.txt": '((symbol == "AAPL") and (frequency == "1min"))',#     "AAPL_30min_sample.txt": '((symbol == "AAPL") and (frequency == "30min"))',#     "AAPL_5min_sample.txt": '((symbol == "AAPL") and (frequency == "5min"))',#     "AMZN_1hour_sample.txt": '((symbol == "AMZN") and (frequency == "1hour"))',#     "AMZN_1min_sample.txt": '((symbol == "AMZN") and (frequency == "1min"))',#     "AMZN_30min_sample.txt": '((symbol == "AMZN") and (frequency == "30min"))',#     "AMZN_5min_sample.txt": '((symbol == "AMZN") and (frequency == "5min"))',#     "MSFT_1hour_sample.txt": '((symbol == "MSFT") and (frequency == "1hour"))',#     "MSFT_1min_sample.txt": '((symbol == "MSFT") and (frequency == "1min"))',#     "MSFT_30min_sample.txt": '((symbol == "MSFT") and (frequency == "30min"))',#     "MSFT_5min_sample.txt": '((symbol == "MSFT") and (frequency == "5min"))',# }
>
> ds_partitioned = pds.dataset(
>     csv_files, format=csvformat, filesystem=fsspec_fs, partitioning=partitioning,
> )
>
> print(ds_partitioned.head(5))# pyarrow.Table# datetime: timestamp[s]# open: double# high: double# low: double# close: double# volume: double# symbol: string# frequency: string# ----# datetime: [[2022-04-01 04:00:00,2022-04-01 05:00:00,2022-04-01 06:00:00,2022-04-01 07:00:00,2022-04-01 08:00:00]]# open: [[175.25,175.32,175.43,175.54,175.49]]# high: [[175.88,175.38,175.72,175.6,175.52]]# low: [[175.1,175.04,175.33,174.69,173.35]]# close: [[175.26,175.31,175.5,174.82,173.6]]# volume: [[24417,13692,90057,162983,736016]]# symbol: [[null,null,null,null,null]]# frequency: [[null,null,null,null,null]]
>
>
> On Wed, Jul 20, 2022 at 11:12 AM Kirby, Adam <ak...@gmail.com> wrote:
>
>> Micah, Great idea, thank you! I really appreciate the pointer.
>>
>> On Wed, Jul 20, 2022 at 12:04 AM Micah Kornfield <em...@gmail.com>
>> wrote:
>>
>>> You could maybe use datasets on top of fsspec's zip file system [1]?
>>>
>>> [1]
>>> https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/zip.html
>>>
>>> On Tuesday, July 19, 2022, Kirby, Adam <ak...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I'm currently using pyarrow.csv.read_csv to parse a CSV stream that
>>>> originates from a ZIP of multiple CSV files. For now, I'm using a separate
>>>> implementation to do the streaming ZIP decompression, then
>>>> using pyarrow.csv.read_csv at each CSV file boundary.
>>>>
>>>> I would love if there were a way to leverage pyarrow to handle the
>>>> decompression. From what I've seen in examples, a ZIP file containing a
>>>> single CSV is supported -- that is, it's possible to operate on a
>>>> compressed CSV stream -- but I wonder if it's possible to handle a
>>>> compressed stream that contains multiple files?
>>>>
>>>> Thank you in advance!
>>>>
>>>

Re: Support for reading compressed ZIP stream of multiple CSVs

Posted by "Kirby, Adam" <ak...@gmail.com>.
As a follow-up, I can confirm that this appears to work very well for
non-partitioned data at least.
In my case, the data are ‘partitioned’ and while the rest of the data are
parsed properly, the partition fields don’t seem to be being extracted from
the filenames. Does it appear that I am doing something incorrectly?

Thank you!

—

#!/usr/bin/env python3
import fsspecimport pyarrow as paimport pyarrow.csv as pcsvimport
pyarrow.dataset as pds

sample_file = (
    "https://firstratedata.com/_data/_deploy/stocks-complete_bundle_sample.zip"
)
schema = pa.schema(
    [
        pa.field("datetime", pa.timestamp("s")),
        pa.field("open", pa.float64()),
        pa.field("high", pa.float64()),
        pa.field("low", pa.float64()),
        pa.field("close", pa.float64()),
        pa.field("volume", pa.float64()),
    ],
)
read_opts, convert_opts = pcsv.ReadOptions(), pcsv.ConvertOptions()
convert_opts.column_types = schema
read_opts.column_names = schema.names
csvformat = pds.CsvFileFormat(convert_options=convert_opts,
read_options=read_opts)

fsspec_fs = fsspec.filesystem("zip", fo=fsspec.open(sample_file))

csv_files = [_ for _ in fsspec_fs.ls("/") if _.endswith("_sample.txt")]
print(csv_files)# ['AAPL_1hour_sample.txt', 'AAPL_1min_sample.txt',
'AAPL_30min_sample.txt',# 'AAPL_5min_sample.txt',
'AMZN_1hour_sample.txt', 'AMZN_1min_sample.txt',#
'AMZN_30min_sample.txt', 'AMZN_5min_sample.txt',
'MSFT_1hour_sample.txt',# 'MSFT_1min_sample.txt',
'MSFT_30min_sample.txt', 'MSFT_5min_sample.txt']

part_schema = pa.schema([("symbol", pa.string()), ("frequency", pa.string())])
partitioning = pds.FilenamePartitioning(schema=part_schema)
# confirm filenames are parsed correctly
print({_: str(partitioning.parse(_)) for _ in csv_files})# {#
"AAPL_1hour_sample.txt": '((symbol == "AAPL") and (frequency ==
"1hour"))',#     "AAPL_1min_sample.txt": '((symbol == "AAPL") and
(frequency == "1min"))',#     "AAPL_30min_sample.txt": '((symbol ==
"AAPL") and (frequency == "30min"))',#     "AAPL_5min_sample.txt":
'((symbol == "AAPL") and (frequency == "5min"))',#
"AMZN_1hour_sample.txt": '((symbol == "AMZN") and (frequency ==
"1hour"))',#     "AMZN_1min_sample.txt": '((symbol == "AMZN") and
(frequency == "1min"))',#     "AMZN_30min_sample.txt": '((symbol ==
"AMZN") and (frequency == "30min"))',#     "AMZN_5min_sample.txt":
'((symbol == "AMZN") and (frequency == "5min"))',#
"MSFT_1hour_sample.txt": '((symbol == "MSFT") and (frequency ==
"1hour"))',#     "MSFT_1min_sample.txt": '((symbol == "MSFT") and
(frequency == "1min"))',#     "MSFT_30min_sample.txt": '((symbol ==
"MSFT") and (frequency == "30min"))',#     "MSFT_5min_sample.txt":
'((symbol == "MSFT") and (frequency == "5min"))',# }

ds_partitioned = pds.dataset(
    csv_files, format=csvformat, filesystem=fsspec_fs,
partitioning=partitioning,
)

print(ds_partitioned.head(5))# pyarrow.Table# datetime: timestamp[s]#
open: double# high: double# low: double# close: double# volume:
double# symbol: string# frequency: string# ----# datetime:
[[2022-04-01 04:00:00,2022-04-01 05:00:00,2022-04-01
06:00:00,2022-04-01 07:00:00,2022-04-01 08:00:00]]# open:
[[175.25,175.32,175.43,175.54,175.49]]# high:
[[175.88,175.38,175.72,175.6,175.52]]# low:
[[175.1,175.04,175.33,174.69,173.35]]# close:
[[175.26,175.31,175.5,174.82,173.6]]# volume:
[[24417,13692,90057,162983,736016]]# symbol:
[[null,null,null,null,null]]# frequency: [[null,null,null,null,null]]


On Wed, Jul 20, 2022 at 11:12 AM Kirby, Adam <ak...@gmail.com> wrote:

> Micah, Great idea, thank you! I really appreciate the pointer.
>
> On Wed, Jul 20, 2022 at 12:04 AM Micah Kornfield <em...@gmail.com>
> wrote:
>
>> You could maybe use datasets on top of fsspec's zip file system [1]?
>>
>> [1]
>> https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/zip.html
>>
>> On Tuesday, July 19, 2022, Kirby, Adam <ak...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I'm currently using pyarrow.csv.read_csv to parse a CSV stream that
>>> originates from a ZIP of multiple CSV files. For now, I'm using a separate
>>> implementation to do the streaming ZIP decompression, then
>>> using pyarrow.csv.read_csv at each CSV file boundary.
>>>
>>> I would love if there were a way to leverage pyarrow to handle the
>>> decompression. From what I've seen in examples, a ZIP file containing a
>>> single CSV is supported -- that is, it's possible to operate on a
>>> compressed CSV stream -- but I wonder if it's possible to handle a
>>> compressed stream that contains multiple files?
>>>
>>> Thank you in advance!
>>>
>>

Re: Support for reading compressed ZIP stream of multiple CSVs

Posted by "Kirby, Adam" <ak...@gmail.com>.
Micah, Great idea, thank you! I really appreciate the pointer.

On Wed, Jul 20, 2022 at 12:04 AM Micah Kornfield <em...@gmail.com>
wrote:

> You could maybe use datasets on top of fsspec's zip file system [1]?
>
> [1]
> https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/zip.html
>
> On Tuesday, July 19, 2022, Kirby, Adam <ak...@gmail.com> wrote:
>
>> Hi All,
>>
>> I'm currently using pyarrow.csv.read_csv to parse a CSV stream that
>> originates from a ZIP of multiple CSV files. For now, I'm using a separate
>> implementation to do the streaming ZIP decompression, then
>> using pyarrow.csv.read_csv at each CSV file boundary.
>>
>> I would love if there were a way to leverage pyarrow to handle the
>> decompression. From what I've seen in examples, a ZIP file containing a
>> single CSV is supported -- that is, it's possible to operate on a
>> compressed CSV stream -- but I wonder if it's possible to handle a
>> compressed stream that contains multiple files?
>>
>> Thank you in advance!
>>
>

Re: Support for reading compressed ZIP stream of multiple CSVs

Posted by Micah Kornfield <em...@gmail.com>.
You could maybe use datasets on top of fsspec's zip file system [1]?

[1]
https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/zip.html

On Tuesday, July 19, 2022, Kirby, Adam <ak...@gmail.com> wrote:

> Hi All,
>
> I'm currently using pyarrow.csv.read_csv to parse a CSV stream that
> originates from a ZIP of multiple CSV files. For now, I'm using a separate
> implementation to do the streaming ZIP decompression, then
> using pyarrow.csv.read_csv at each CSV file boundary.
>
> I would love if there were a way to leverage pyarrow to handle the
> decompression. From what I've seen in examples, a ZIP file containing a
> single CSV is supported -- that is, it's possible to operate on a
> compressed CSV stream -- but I wonder if it's possible to handle a
> compressed stream that contains multiple files?
>
> Thank you in advance!
>