You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Lance Dacey (Jira)" <ji...@apache.org> on 2020/11/14 01:18:00 UTC

[jira] [Updated] (ARROW-10517) [Python] Unable to read/write Parquet datasets with fsspec on Azure Blob

     [ https://issues.apache.org/jira/browse/ARROW-10517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Lance Dacey updated ARROW-10517:
--------------------------------
    Description: 
 

 

If I downgrade adlfs to 0.2.5 and azure-blob-storage to 2.1, and then upgrade fsspec (0.6.2 has errors with a detail kwarg, so I need to upgrade it):

 
{code:python}
# adal==1.2.5
# adlfs==0.2.5
# fsspec==0.7.4
# pandas==1.1.3
# pyarrow==2.0.0
# azure-storage-blob==2.1.0
# azure-storage-common==2.1.0

import pyarrow.dataset as ds
import fsspec
from pyarrow.dataset import DirectoryPartitioning

fs = fsspec.filesystem(protocol='abfs', 
                       account_name=base.login, 
                       account_key=base.password)


ds.write_dataset(data=table, 
                 base_dir="dev/test7", 
                 basename_template=None, 
                 format="parquet",
                 partitioning=DirectoryPartitioning(pa.schema([("year", pa.string()), ("month", pa.string()), ("day", pa.string())])), 
                 schema=table.schema,
                 filesystem=fs, 
                )
{code}
 I think this is due to early versions of adlfs having mkdir(). Although I use write_to_dataset and write_table all of the time, so I am not sure why this would be an issue.
{code:python}
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-40-bb38d83f896e> in <module>
     13 
     14 
---> 15 ds.write_dataset(data=table, 
     16                  base_dir="dev/test7",
     17                  basename_template=None,

/opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in write_dataset(data, base_dir, basename_template, format, partitioning, schema, filesystem, file_options, use_threads)
    771     filesystem, _ = _ensure_fs(filesystem)
    772 
--> 773     _filesystemdataset_write(
    774         data, base_dir, basename_template, schema,
    775         filesystem, partitioning, file_options, use_threads,

/opt/conda/lib/python3.8/site-packages/pyarrow/_dataset.pyx in pyarrow._dataset._filesystemdataset_write()

/opt/conda/lib/python3.8/site-packages/pyarrow/_fs.pyx in pyarrow._fs._cb_create_dir()

/opt/conda/lib/python3.8/site-packages/pyarrow/fs.py in create_dir(self, path, recursive)
    226     def create_dir(self, path, recursive):
    227         # mkdir also raises FileNotFoundError when base directory is not found
--> 228         self.fs.mkdir(path, create_parents=recursive)
    229 
    230     def delete_dir(self, path):

/opt/conda/lib/python3.8/site-packages/adlfs/core.py in mkdir(self, path, delimiter, exists_ok, **kwargs)
    561             else:
    562                 ## everything else
--> 563                 raise RuntimeError(f"Cannot create {container_name}{delimiter}{path}.")
    564         else:
    565             if container_name in self.ls("") and path:

RuntimeError: Cannot create dev/test7/2020/01/28.
{code}
 
Next, if I try to read a dataset (keep in mind that this works with read_table and ParquetDataset):

{code:python}
ds.dataset(source="dev/staging/evaluations", 
           format="parquet", 
           partitioning="hive",
           exclude_invalid_files=False,
           filesystem=fs
          )
{code}
 
This doesn't seem to respect the filesystem connected to Azure Blob.
{code:python}
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
<ipython-input-41-4de65fe95db7> in <module>
----> 1 ds.dataset(source="dev/staging/evaluations", 
      2            format="parquet",
      3            partitioning="hive",
      4            exclude_invalid_files=False,
      5            filesystem=fs

/opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes)
    669     # TODO(kszucs): support InMemoryDataset for a table input
    670     if _is_path_like(source):
--> 671         return _filesystem_dataset(source, **kwargs)
    672     elif isinstance(source, (tuple, list)):
    673         if all(_is_path_like(elem) for elem in source):

/opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
    426         fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
    427     else:
--> 428         fs, paths_or_selector = _ensure_single_source(source, filesystem)
    429 
    430     options = FileSystemFactoryOptions(

/opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in _ensure_single_source(path, filesystem)
    402         paths_or_selector = [path]
    403     else:
--> 404         raise FileNotFoundError(path)
    405 
    406     return filesystem, paths_or_selector

FileNotFoundError: dev/staging/evaluations
{code}

This *does* work though when I list the blobs before passing them to ds.dataset:

{code:python}
blobs = wasb.list_blobs(container_name="dev", prefix="staging/evaluations")

dataset = ds.dataset(source=["dev/" + blob.name for blob in blobs], 
                     format="parquet", 
                     partitioning="hive",
                     exclude_invalid_files=False,
                     filesystem=fs)
{code}

Next, if I downgrade to pyarrow 1.0.1, I am able to read datasets (but there is no write_datasets):

{code:python}
# adal==1.2.5
# adlfs==0.2.5
# azure-storage-blob==2.1.0
# azure-storage-common==2.1.0
# fsspec==0.7.4
# pandas==1.1.3
# pyarrow==1.0.1

dataset = ds.dataset("dev/staging/evaluations", format="parquet", filesystem=fs)
dataset.to_table().to_pandas()
{code}

  was:
 

 

If I downgrade adlfs to 0.2.5 and azure-blob-storage to 2.1, and then upgrade fsspec (0.6.2 has errors with a detail kwarg, so I need to upgrade it):

 
{code:java}
pa.dataset.write_dataset(data=table, 
 base_dir="test/test7", 
 basename_template=None, 
 format="parquet",
 partitioning=DirectoryPartitioning(pa.schema([("year", pa.int64()), ("month", pa.int16()), ("day", pa.int16())])), 
 schema=table.schema,
 filesystem=blob_fs){code}
 
{code:java}
226 def create_dir(self, path, recursive):  
227 # mkdir also raises FileNotFoundError when base directory is not found --> 228 self.fs.mkdir(path, create_parents=recursive){code}
 

It does not look like there is a mkdir option. However, the output of fs.find() returns a dictionary as expected:
{code:java}
selected_files = blob_fs.find(
 "test/test6", maxdepth=None, withdirs=True, detail=True
){code}
 

Now if I install the latest version of adlfs it upgrades my blob SDK to 12.5 (unfortunately, I cannot use this in production since Airflow requires 2.1, so this is only for testing purposes):
{code:java}
Successfully installed adlfs-0.5.5 azure-storage-blob-12.5.0{code}
 

Now fs.find() returns a list, but I am able to use fs.mkdir().
{code:java}
['test/test6/year=2020',
 'test/test6/year=2020/month=11',
 'test/test6/year=2020/month=11/day=1',
 'test/test6/year=2020/month=11/day=1/8ee6c66320ca47908c37f112f0cffd6c.parquet',
 'test/test6/year=2020/month=11/day=1/ef753f016efc44b7b0f0800c35d084fc.parquet',]{code}
 

This causes issues later when I try to read a dataset (the code is expecting a dictionary still):
{code:java}
dataset = ds.dataset("test/test5", filesystem=blob_fs, format="parquet"){code}
{code:java}
--> 
221 for path, info in selected_files.items():  
222 infos.append(self._create_file_info(path, info))  
223 AttributeError: 'list' object has no attribute 'items'{code}
 

I am still able to read individual files:
{code:java}
dataset = ds.dataset("test/test4/year=2020/month=11/2020-11.parquet", filesystem=blob_fs, format="parquet"){code}
 And I can read the dataset if I pass in a list of blob names "manually":

 
{code:java}
blobs = wasb.list_blobs(container_name="test", prefix="test4")
dataset = ds.dataset(source=["test/" + blob.name for blob in blobs], 
 format="parquet", 
 partitioning="hive",
 filesystem=blob_fs)
{code}
 

For all of my examples, blob_fs is defined by:
{code:java}
blob_fs = fsspec.filesystem(
 protocol="abfs", account_name=base.login, account_key=base.password
 ){code}
 


> [Python] Unable to read/write Parquet datasets with fsspec on Azure Blob
> ------------------------------------------------------------------------
>
>                 Key: ARROW-10517
>                 URL: https://issues.apache.org/jira/browse/ARROW-10517
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>    Affects Versions: 2.0.0
>         Environment: Ubuntu 18.04
>            Reporter: Lance Dacey
>            Priority: Major
>              Labels: azureblob, dataset, dataset-parquet-read, dataset-parquet-write, fsspec
>
>  
>  
> If I downgrade adlfs to 0.2.5 and azure-blob-storage to 2.1, and then upgrade fsspec (0.6.2 has errors with a detail kwarg, so I need to upgrade it):
>  
> {code:python}
> # adal==1.2.5
> # adlfs==0.2.5
> # fsspec==0.7.4
> # pandas==1.1.3
> # pyarrow==2.0.0
> # azure-storage-blob==2.1.0
> # azure-storage-common==2.1.0
> import pyarrow.dataset as ds
> import fsspec
> from pyarrow.dataset import DirectoryPartitioning
> fs = fsspec.filesystem(protocol='abfs', 
>                        account_name=base.login, 
>                        account_key=base.password)
> ds.write_dataset(data=table, 
>                  base_dir="dev/test7", 
>                  basename_template=None, 
>                  format="parquet",
>                  partitioning=DirectoryPartitioning(pa.schema([("year", pa.string()), ("month", pa.string()), ("day", pa.string())])), 
>                  schema=table.schema,
>                  filesystem=fs, 
>                 )
> {code}
>  I think this is due to early versions of adlfs having mkdir(). Although I use write_to_dataset and write_table all of the time, so I am not sure why this would be an issue.
> {code:python}
> ---------------------------------------------------------------------------
> RuntimeError                              Traceback (most recent call last)
> <ipython-input-40-bb38d83f896e> in <module>
>      13 
>      14 
> ---> 15 ds.write_dataset(data=table, 
>      16                  base_dir="dev/test7",
>      17                  basename_template=None,
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in write_dataset(data, base_dir, basename_template, format, partitioning, schema, filesystem, file_options, use_threads)
>     771     filesystem, _ = _ensure_fs(filesystem)
>     772 
> --> 773     _filesystemdataset_write(
>     774         data, base_dir, basename_template, schema,
>     775         filesystem, partitioning, file_options, use_threads,
> /opt/conda/lib/python3.8/site-packages/pyarrow/_dataset.pyx in pyarrow._dataset._filesystemdataset_write()
> /opt/conda/lib/python3.8/site-packages/pyarrow/_fs.pyx in pyarrow._fs._cb_create_dir()
> /opt/conda/lib/python3.8/site-packages/pyarrow/fs.py in create_dir(self, path, recursive)
>     226     def create_dir(self, path, recursive):
>     227         # mkdir also raises FileNotFoundError when base directory is not found
> --> 228         self.fs.mkdir(path, create_parents=recursive)
>     229 
>     230     def delete_dir(self, path):
> /opt/conda/lib/python3.8/site-packages/adlfs/core.py in mkdir(self, path, delimiter, exists_ok, **kwargs)
>     561             else:
>     562                 ## everything else
> --> 563                 raise RuntimeError(f"Cannot create {container_name}{delimiter}{path}.")
>     564         else:
>     565             if container_name in self.ls("") and path:
> RuntimeError: Cannot create dev/test7/2020/01/28.
> {code}
>  
> Next, if I try to read a dataset (keep in mind that this works with read_table and ParquetDataset):
> {code:python}
> ds.dataset(source="dev/staging/evaluations", 
>            format="parquet", 
>            partitioning="hive",
>            exclude_invalid_files=False,
>            filesystem=fs
>           )
> {code}
>  
> This doesn't seem to respect the filesystem connected to Azure Blob.
> {code:python}
> ---------------------------------------------------------------------------
> FileNotFoundError                         Traceback (most recent call last)
> <ipython-input-41-4de65fe95db7> in <module>
> ----> 1 ds.dataset(source="dev/staging/evaluations", 
>       2            format="parquet",
>       3            partitioning="hive",
>       4            exclude_invalid_files=False,
>       5            filesystem=fs
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes)
>     669     # TODO(kszucs): support InMemoryDataset for a table input
>     670     if _is_path_like(source):
> --> 671         return _filesystem_dataset(source, **kwargs)
>     672     elif isinstance(source, (tuple, list)):
>     673         if all(_is_path_like(elem) for elem in source):
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
>     426         fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
>     427     else:
> --> 428         fs, paths_or_selector = _ensure_single_source(source, filesystem)
>     429 
>     430     options = FileSystemFactoryOptions(
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in _ensure_single_source(path, filesystem)
>     402         paths_or_selector = [path]
>     403     else:
> --> 404         raise FileNotFoundError(path)
>     405 
>     406     return filesystem, paths_or_selector
> FileNotFoundError: dev/staging/evaluations
> {code}
> This *does* work though when I list the blobs before passing them to ds.dataset:
> {code:python}
> blobs = wasb.list_blobs(container_name="dev", prefix="staging/evaluations")
> dataset = ds.dataset(source=["dev/" + blob.name for blob in blobs], 
>                      format="parquet", 
>                      partitioning="hive",
>                      exclude_invalid_files=False,
>                      filesystem=fs)
> {code}
> Next, if I downgrade to pyarrow 1.0.1, I am able to read datasets (but there is no write_datasets):
> {code:python}
> # adal==1.2.5
> # adlfs==0.2.5
> # azure-storage-blob==2.1.0
> # azure-storage-common==2.1.0
> # fsspec==0.7.4
> # pandas==1.1.3
> # pyarrow==1.0.1
> dataset = ds.dataset("dev/staging/evaluations", format="parquet", filesystem=fs)
> dataset.to_table().to_pandas()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)