You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Nikhil Makan <ni...@gmail.com> on 2022/09/09 03:26:13 UTC

[Python] - Dataset API - What's happening under the hood?

Hi There,

I have been experimenting with Tabular Datasets
<https://arrow.apache.org/docs/python/dataset.html> for data that can be
larger than memory and had a few questions related to what's going on
under the hood and how to work with it (I understand it is still
experimental).

*Question 1: Reading Data from Azure Blob Storage*
Now I know the filesystems don't fully support this yet, but there is an
fsspec compatible library (adlfs) which is shown in the file system example
<https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow>
which
I have used. Example below with the nyc taxi dataset, where I am pulling
the whole dataset through and writing to disk to the feather format.

import adlfs
import pyarrow.dataset as ds

fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')

dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')

scanner = dataset.scanner()
ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')

This could be something on the Azure side but I find I am being
bottlenecked on the download speed and have noticed if I spin up multiple
Python sessions (or in my case interactive windows) I can increase my
throughput. Hence I can download each year of the taxinyc dataset in
separate interactive windows and increase my bandwidth consumed. The tabular
dataset <https://arrow.apache.org/docs/python/dataset.html> documentation
notes 'optionally parallel reading.' Do you know how I can control this? Or
perhaps control the number of concurrent connections. Or has this got
nothing to do with the arrow and sits purley on the Azure side? I have
increased the io thread count from the default 8 to 16 and saw no
difference, but could still spin up more interactive windows to maximise
bandwidth.

*Question 2: Reading Filtered Data from Azure Blob Storage*
Unfortunately I don't quite have a repeatable example here. However using
the same data above, only this time I have each year as a feather file
instead of a parquet file. I have uploaded this to my own Azure blob
storage account.
I am trying to read a subset of this data from the blob storage by
selecting columns and filtering the data. The final result should be a
dataframe that takes up around 240 mb of memory (I have tested this by
working with the data locally). However when I run this by connecting to
the Azure blob storage it takes over an hour to run and it's clear it's
downloading a lot more data than I would have thought. Given the file
formats are feather that supports random access I would have thought I
would only have to download the 240 mb?

Is there more going on in the background? Perhaps I am using this
incorrectly?

import adlfs
import pyarrow.dataset as ds

connection_string = ''
fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)

ds_f = ds.dataset("taxinyc/green/feather/", format='feather')

df = (
    ds_f
    .scanner(
        columns={ # Selections and Projections
            'passengerCount': ds.field(('passengerCount'))*1000,
            'tripDistance': ds.field(('tripDistance'))
        },
        filter=(ds.field('vendorID') == 1)
    )
    .to_table()
    .to_pandas()
)

df.info()

*Question 3: How is memory mapping being applied?*
Does the Dataset API make use of memory mapping? Do I have the correct
understanding that memory mapping is only intended for dealing with large
data stored on a local file system. Where as data stored on a cloud file
system in the feather format effectively cannot be memory mapped?

*Question 4: Projections*
I noticed in the scanner function when projecting a column I am unable to
use any compute functions (I get a Type Error: only other expressions
allowed as arguments) yet I am able to multiply this using standard python
arithmetic.

'passengerCount': ds.field(('passengerCount'))*1000,

'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),

Is this correct or am I to process this using an iterator via record batch
<https://arrow.apache.org/docs/python/dataset.html#iterative-out-of-core-or-streaming-reads>
to
do this out of core? Is it actually even doing it out of core with " *1000
".

Thanks for your help in advance. I have been following the Arrow project
for the last two years but have only recently decided to dive into it in
depth to explore it for various use cases. I am particularly interested in
the out-of-core data processing and the interaction with cloud storages to
retrieve only a selection of data from feather files. Hopefully at some
point when I have enough knowledge I can contribute to this amazing project.

Kind regards
Nikhil Makan

Re: [Python] - Dataset API - What's happening under the hood?

Posted by Jacob Quinn <qu...@gmail.com>.
Yes, it already supports concurrent io when downloading or uploading data.
You can configure it with an initial threshold over which it will switch to
concurrent download/upload, and there are also options to configure how big
each "part" is and how many concurrent tasks to allow at a single time.

For downloads, we rely on doing an initial HEAD request on the object to
get the total size (via Content-Length header), then we do byte Range
requests according to user-provided concurrency options.

For uploads, we use the cloud-specific multipart upload APIs; so you
upload/commit each part, then at the end you commit all the parts in a
final step.

-Jacob

On Sun, Oct 9, 2022 at 6:28 PM Nikhil Makan <ni...@gmail.com> wrote:

> Thanks Jacob for the comments. Appreciate it.
>
> Out of interest does the cloud storage interface you are working on for
> Julia support concurrent io in order to improve performance with respect to
> download/uploading data. I know for pyarrow for a blob storage we have to
> use a fsspec compliant library which is adlfs. However concurrent io is
> still in the works with that library.
>
> Kind regards
> Nikhil Makan
>
> On Wed, Oct 5, 2022 at 7:49 PM Jacob Quinn <qu...@gmail.com> wrote:
>
>> Sorry for the late reply, but thought I'd chime in with a thought or two,
>> as I've had the chance to work on both the Arrow.jl Julia implementation as
>> well as recently working on a consistent cloud storage interface (for S3,
>> Azure, and planned GCP, also for Julia;
>> https://github.com/JuliaServices/CloudStore.jl).
>>
>> First, to clarify, all cloud storage providers support "partial" reads in
>> the form of providing support for "ranged" HTTP requests (i.e. with a
>> "Range" header like: "Range: bytes 0-9"). So that means for a single
>> "object" in a cloud store, you could request specific byte ranges of that
>> single object to be returned.
>>
>> How would that interact with stored data files? Well, for Arrow
>> IPC/Feather format, you could potentially do a series of these "range"
>> requests to read a single Feather file flatbuffer metadata, which contains
>> the specific byte offsets of columns within the data file. So in theory, it
>> should be fairly straightforward to apply
>> a kind of "column selection" operation where only specific columns are
>> actually downloaded from the cloud store, and it could be avoided to
>> download the entire file.
>>
>> For other data formats? It's generally not as applicable since we don't
>> have this kind specific byte information of where certain rows/columns live
>> within a single object.
>>
>> On the other hand, the parquet format supports a partitioning scheme that
>> *IS* more amenable to "partial reads", but in a slightly different way.
>> Instead of using HTTP Range requests, specific columns or row batches of
>> columns are stored as separate *objects* in the cloud store. And so by
>> doing a "list" type of operation
>> on all "objects" in the store, and reading overall metadata of the
>> parquet data, we could similarly do a "column selection" kind of operation
>> by only downloading specific *objects* from the cloud store that correspond
>> to the desired columns.
>>
>> Hopefully that provides a little bit of clarity?
>>
>> This is somewhat the overall vision that we're working towards with the
>> Julia implementation to hopefully provide really efficient interop with
>> cloud-stored data.
>>
>> -Jacob Quinn
>>
>>
>> On Sun, Sep 18, 2022 at 7:47 PM Nikhil Makan <ni...@gmail.com>
>> wrote:
>>
>>> Thanks Aldrin for the response on this.
>>>
>>> Question 1:
>>> For reference to anyone else who reads this, it appears adlfs does not
>>> support concurrent io and this is currently being developed.
>>> https://github.com/fsspec/adlfs/issues/268
>>>
>>> Question 2:
>>> Noted your points. I am using block blobs. If I am understanding you
>>> correctly are you suggesting just splitting the data up into separate
>>> blobs? This way if I filter the data it only downloads the blobs that are
>>> required? This would seem to only work if you know beforehand what the
>>> filter could be so you can split your data accordingly. However, if you
>>> wanted to return two columns of all the data I assume this would still
>>> result in all the blobs being downloaded. I also came across this
>>> https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-query-acceleration-how-to?tabs=python.
>>> However this is only for csv/json files.
>>>
>>> Are you aware of partial reading where we take advantage of the columnar
>>> format such as arrow/parquet being implemented in any other storages suchs
>>> as Google Cloud Storage or Amazon S3? I know PyArrow has native support for
>>> GCS and S3 however I ran this test example against S3 and no real
>>> improvements. Seems to be the same issue where the whole file is downloaded.
>>>
>>> import pyarrow.dataset as ds
>>> ds_f = ds.dataset(
>>> "s3://voltrondata-labs-datasets/nyc-taxi/year=2019/month=1")
>>> ds_f.head(10)
>>>
>>> df = (
>>>     ds_f
>>>     .scanner(
>>>         columns={ # Selections and Projections
>>>             'passengerCount': ds.field(('passengerCount')),
>>>         },
>>>     )
>>>     .to_table()
>>>     .to_pandas()
>>> )
>>> df.info()
>>>
>>> Question 3;
>>> Thanks, Noted.
>>>
>>> Question 4:
>>> Tried this:
>>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),pa.scalar(
>>> 1000))
>>> Same issue -> Type Error: only other expressions allowed as arguments
>>>
>>> However it works with this:
>>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),pc.scalar(
>>> 1000))
>>>
>>> This works as well as noted previosuly, so I assume the python operators
>>> are mapped across similar to what happens when you use the operators
>>> against a numpy or pandas series it just executes a np.multiply or pd.
>>> multiply in the background.
>>> 'passengerCount': ds.field(('passengerCount'))*1000,
>>>
>>> Kind regards
>>> Nikhil Makan
>>>
>>> On Fri, Sep 16, 2022 at 12:28 PM Aldrin <ak...@ucsc.edu> wrote:
>>>
>>>> (oh, sorry I misread `pa.scalar` as `pc.scalar`, so please try
>>>> `pyarrow.scalar` per the documentation)
>>>>
>>>> Aldrin Montana
>>>> Computer Science PhD Student
>>>> UC Santa Cruz
>>>>
>>>>
>>>> On Thu, Sep 15, 2022 at 5:26 PM Aldrin <ak...@ucsc.edu> wrote:
>>>>
>>>>> For Question 2:
>>>>> At a glance, I don't see anything in adlfs or azure that is able to do
>>>>> partial reads of a blob. If you're using block blobs, then likely you would
>>>>> want to store blocks of your file as separate blocks of a blob, and then
>>>>> you can do partial data transfers that way. I could be misunderstanding the
>>>>> SDKs or how Azure stores data, but my guess is that a whole blob is
>>>>> retrieved and then the local file is able to support partial, block-based
>>>>> reads as you expect from local filesystems. You may be able to double check
>>>>> how much data is being retrieved by looking at where adlfs is mounting your
>>>>> blob storage.
>>>>>
>>>>> For Question 3:
>>>>> you can memory map remote files, it's just that every page fault will
>>>>> be even more expensive than for local files. I am not sure how to tell the
>>>>> dataset API to do memory mapping, and I'm not sure how well that would work
>>>>> over adlfs.
>>>>>
>>>>> For Question 4:
>>>>> Can you try using `pc.scalar(1000)` as shown in the first code excerpt
>>>>> in [1]:
>>>>>
>>>>> >> x, y = pa.scalar(7.8), pa.scalar(9.3)
>>>>> >> pc.multiply(x, y)
>>>>> <pyarrow.DoubleScalar: 72.54>
>>>>>
>>>>> [1]:
>>>>> https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
>>>>>
>>>>> Aldrin Montana
>>>>> Computer Science PhD Student
>>>>> UC Santa Cruz
>>>>>
>>>>>
>>>>> On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <ni...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi There,
>>>>>>
>>>>>> I have been experimenting with Tabular Datasets
>>>>>> <https://arrow.apache.org/docs/python/dataset.html> for data that
>>>>>> can be larger than memory and had a few questions related to what's going
>>>>>> on under the hood and how to work with it (I understand it is still
>>>>>> experimental).
>>>>>>
>>>>>> *Question 1: Reading Data from Azure Blob Storage*
>>>>>> Now I know the filesystems don't fully support this yet, but there is
>>>>>> an fsspec compatible library (adlfs) which is shown in the file
>>>>>> system example
>>>>>> <https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow> which
>>>>>> I have used. Example below with the nyc taxi dataset, where I am pulling
>>>>>> the whole dataset through and writing to disk to the feather format.
>>>>>>
>>>>>> import adlfs
>>>>>> import pyarrow.dataset as ds
>>>>>>
>>>>>> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
>>>>>>
>>>>>> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet'
>>>>>> )
>>>>>>
>>>>>> scanner = dataset.scanner()
>>>>>> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather'
>>>>>> )
>>>>>>
>>>>>> This could be something on the Azure side but I find I am being
>>>>>> bottlenecked on the download speed and have noticed if I spin up multiple
>>>>>> Python sessions (or in my case interactive windows) I can increase my
>>>>>> throughput. Hence I can download each year of the taxinyc dataset in
>>>>>> separate interactive windows and increase my bandwidth consumed. The tabular
>>>>>> dataset <https://arrow.apache.org/docs/python/dataset.html> documentation
>>>>>> notes 'optionally parallel reading.' Do you know how I can control this? Or
>>>>>> perhaps control the number of concurrent connections. Or has this got
>>>>>> nothing to do with the arrow and sits purley on the Azure side? I have
>>>>>> increased the io thread count from the default 8 to 16 and saw no
>>>>>> difference, but could still spin up more interactive windows to maximise
>>>>>> bandwidth.
>>>>>>
>>>>>> *Question 2: Reading Filtered Data from Azure Blob Storage*
>>>>>> Unfortunately I don't quite have a repeatable example here. However
>>>>>> using the same data above, only this time I have each year as a feather
>>>>>> file instead of a parquet file. I have uploaded this to my own Azure blob
>>>>>> storage account.
>>>>>> I am trying to read a subset of this data from the blob storage by
>>>>>> selecting columns and filtering the data. The final result should be a
>>>>>> dataframe that takes up around 240 mb of memory (I have tested this by
>>>>>> working with the data locally). However when I run this by connecting to
>>>>>> the Azure blob storage it takes over an hour to run and it's clear it's
>>>>>> downloading a lot more data than I would have thought. Given the file
>>>>>> formats are feather that supports random access I would have thought I
>>>>>> would only have to download the 240 mb?
>>>>>>
>>>>>> Is there more going on in the background? Perhaps I am using this
>>>>>> incorrectly?
>>>>>>
>>>>>> import adlfs
>>>>>> import pyarrow.dataset as ds
>>>>>>
>>>>>> connection_string = ''
>>>>>> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
>>>>>>
>>>>>> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
>>>>>>
>>>>>> df = (
>>>>>>     ds_f
>>>>>>     .scanner(
>>>>>>         columns={ # Selections and Projections
>>>>>>             'passengerCount': ds.field(('passengerCount'))*1000,
>>>>>>             'tripDistance': ds.field(('tripDistance'))
>>>>>>         },
>>>>>>         filter=(ds.field('vendorID') == 1)
>>>>>>     )
>>>>>>     .to_table()
>>>>>>     .to_pandas()
>>>>>> )
>>>>>>
>>>>>> df.info()
>>>>>>
>>>>>> *Question 3: How is memory mapping being applied?*
>>>>>> Does the Dataset API make use of memory mapping? Do I have the
>>>>>> correct understanding that memory mapping is only intended for dealing with
>>>>>> large data stored on a local file system. Where as data stored on a cloud
>>>>>> file system in the feather format effectively cannot be memory mapped?
>>>>>>
>>>>>> *Question 4: Projections*
>>>>>> I noticed in the scanner function when projecting a column I am
>>>>>> unable to use any compute functions (I get a Type Error: only other
>>>>>> expressions allowed as arguments) yet I am able to multiply this using
>>>>>> standard python arithmetic.
>>>>>>
>>>>>> 'passengerCount': ds.field(('passengerCount'))*1000,
>>>>>>
>>>>>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
>>>>>>
>>>>>> Is this correct or am I to process this using an iterator via record
>>>>>> batch
>>>>>> <https://arrow.apache.org/docs/python/dataset.html#iterative-out-of-core-or-streaming-reads> to
>>>>>> do this out of core? Is it actually even doing it out of core with " *1000
>>>>>> ".
>>>>>>
>>>>>> Thanks for your help in advance. I have been following the Arrow
>>>>>> project for the last two years but have only recently decided to dive into
>>>>>> it in depth to explore it for various use cases. I am
>>>>>> particularly interested in the out-of-core data processing and the
>>>>>> interaction with cloud storages to retrieve only a selection of data from
>>>>>> feather files. Hopefully at some point when I have enough knowledge I can
>>>>>> contribute to this amazing project.
>>>>>>
>>>>>> Kind regards
>>>>>> Nikhil Makan
>>>>>>
>>>>>

Re: [Python] - Dataset API - What's happening under the hood?

Posted by Nikhil Makan <ni...@gmail.com>.
Thanks Jacob for the comments. Appreciate it.

Out of interest does the cloud storage interface you are working on for
Julia support concurrent io in order to improve performance with respect to
download/uploading data. I know for pyarrow for a blob storage we have to
use a fsspec compliant library which is adlfs. However concurrent io is
still in the works with that library.

Kind regards
Nikhil Makan

On Wed, Oct 5, 2022 at 7:49 PM Jacob Quinn <qu...@gmail.com> wrote:

> Sorry for the late reply, but thought I'd chime in with a thought or two,
> as I've had the chance to work on both the Arrow.jl Julia implementation as
> well as recently working on a consistent cloud storage interface (for S3,
> Azure, and planned GCP, also for Julia;
> https://github.com/JuliaServices/CloudStore.jl).
>
> First, to clarify, all cloud storage providers support "partial" reads in
> the form of providing support for "ranged" HTTP requests (i.e. with a
> "Range" header like: "Range: bytes 0-9"). So that means for a single
> "object" in a cloud store, you could request specific byte ranges of that
> single object to be returned.
>
> How would that interact with stored data files? Well, for Arrow
> IPC/Feather format, you could potentially do a series of these "range"
> requests to read a single Feather file flatbuffer metadata, which contains
> the specific byte offsets of columns within the data file. So in theory, it
> should be fairly straightforward to apply
> a kind of "column selection" operation where only specific columns are
> actually downloaded from the cloud store, and it could be avoided to
> download the entire file.
>
> For other data formats? It's generally not as applicable since we don't
> have this kind specific byte information of where certain rows/columns live
> within a single object.
>
> On the other hand, the parquet format supports a partitioning scheme that
> *IS* more amenable to "partial reads", but in a slightly different way.
> Instead of using HTTP Range requests, specific columns or row batches of
> columns are stored as separate *objects* in the cloud store. And so by
> doing a "list" type of operation
> on all "objects" in the store, and reading overall metadata of the parquet
> data, we could similarly do a "column selection" kind of operation by only
> downloading specific *objects* from the cloud store that correspond to the
> desired columns.
>
> Hopefully that provides a little bit of clarity?
>
> This is somewhat the overall vision that we're working towards with the
> Julia implementation to hopefully provide really efficient interop with
> cloud-stored data.
>
> -Jacob Quinn
>
>
> On Sun, Sep 18, 2022 at 7:47 PM Nikhil Makan <ni...@gmail.com>
> wrote:
>
>> Thanks Aldrin for the response on this.
>>
>> Question 1:
>> For reference to anyone else who reads this, it appears adlfs does not
>> support concurrent io and this is currently being developed.
>> https://github.com/fsspec/adlfs/issues/268
>>
>> Question 2:
>> Noted your points. I am using block blobs. If I am understanding you
>> correctly are you suggesting just splitting the data up into separate
>> blobs? This way if I filter the data it only downloads the blobs that are
>> required? This would seem to only work if you know beforehand what the
>> filter could be so you can split your data accordingly. However, if you
>> wanted to return two columns of all the data I assume this would still
>> result in all the blobs being downloaded. I also came across this
>> https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-query-acceleration-how-to?tabs=python.
>> However this is only for csv/json files.
>>
>> Are you aware of partial reading where we take advantage of the columnar
>> format such as arrow/parquet being implemented in any other storages suchs
>> as Google Cloud Storage or Amazon S3? I know PyArrow has native support for
>> GCS and S3 however I ran this test example against S3 and no real
>> improvements. Seems to be the same issue where the whole file is downloaded.
>>
>> import pyarrow.dataset as ds
>> ds_f = ds.dataset(
>> "s3://voltrondata-labs-datasets/nyc-taxi/year=2019/month=1")
>> ds_f.head(10)
>>
>> df = (
>>     ds_f
>>     .scanner(
>>         columns={ # Selections and Projections
>>             'passengerCount': ds.field(('passengerCount')),
>>         },
>>     )
>>     .to_table()
>>     .to_pandas()
>> )
>> df.info()
>>
>> Question 3;
>> Thanks, Noted.
>>
>> Question 4:
>> Tried this:
>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),pa.scalar(1000
>> ))
>> Same issue -> Type Error: only other expressions allowed as arguments
>>
>> However it works with this:
>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),pc.scalar(1000
>> ))
>>
>> This works as well as noted previosuly, so I assume the python operators
>> are mapped across similar to what happens when you use the operators
>> against a numpy or pandas series it just executes a np.multiply or pd.
>> multiply in the background.
>> 'passengerCount': ds.field(('passengerCount'))*1000,
>>
>> Kind regards
>> Nikhil Makan
>>
>> On Fri, Sep 16, 2022 at 12:28 PM Aldrin <ak...@ucsc.edu> wrote:
>>
>>> (oh, sorry I misread `pa.scalar` as `pc.scalar`, so please try
>>> `pyarrow.scalar` per the documentation)
>>>
>>> Aldrin Montana
>>> Computer Science PhD Student
>>> UC Santa Cruz
>>>
>>>
>>> On Thu, Sep 15, 2022 at 5:26 PM Aldrin <ak...@ucsc.edu> wrote:
>>>
>>>> For Question 2:
>>>> At a glance, I don't see anything in adlfs or azure that is able to do
>>>> partial reads of a blob. If you're using block blobs, then likely you would
>>>> want to store blocks of your file as separate blocks of a blob, and then
>>>> you can do partial data transfers that way. I could be misunderstanding the
>>>> SDKs or how Azure stores data, but my guess is that a whole blob is
>>>> retrieved and then the local file is able to support partial, block-based
>>>> reads as you expect from local filesystems. You may be able to double check
>>>> how much data is being retrieved by looking at where adlfs is mounting your
>>>> blob storage.
>>>>
>>>> For Question 3:
>>>> you can memory map remote files, it's just that every page fault will
>>>> be even more expensive than for local files. I am not sure how to tell the
>>>> dataset API to do memory mapping, and I'm not sure how well that would work
>>>> over adlfs.
>>>>
>>>> For Question 4:
>>>> Can you try using `pc.scalar(1000)` as shown in the first code excerpt
>>>> in [1]:
>>>>
>>>> >> x, y = pa.scalar(7.8), pa.scalar(9.3)
>>>> >> pc.multiply(x, y)
>>>> <pyarrow.DoubleScalar: 72.54>
>>>>
>>>> [1]:
>>>> https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
>>>>
>>>> Aldrin Montana
>>>> Computer Science PhD Student
>>>> UC Santa Cruz
>>>>
>>>>
>>>> On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <ni...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi There,
>>>>>
>>>>> I have been experimenting with Tabular Datasets
>>>>> <https://arrow.apache.org/docs/python/dataset.html> for data that can
>>>>> be larger than memory and had a few questions related to what's going on
>>>>> under the hood and how to work with it (I understand it is still
>>>>> experimental).
>>>>>
>>>>> *Question 1: Reading Data from Azure Blob Storage*
>>>>> Now I know the filesystems don't fully support this yet, but there is
>>>>> an fsspec compatible library (adlfs) which is shown in the file
>>>>> system example
>>>>> <https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow> which
>>>>> I have used. Example below with the nyc taxi dataset, where I am pulling
>>>>> the whole dataset through and writing to disk to the feather format.
>>>>>
>>>>> import adlfs
>>>>> import pyarrow.dataset as ds
>>>>>
>>>>> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
>>>>>
>>>>> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')
>>>>>
>>>>> scanner = dataset.scanner()
>>>>> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')
>>>>>
>>>>> This could be something on the Azure side but I find I am being
>>>>> bottlenecked on the download speed and have noticed if I spin up multiple
>>>>> Python sessions (or in my case interactive windows) I can increase my
>>>>> throughput. Hence I can download each year of the taxinyc dataset in
>>>>> separate interactive windows and increase my bandwidth consumed. The tabular
>>>>> dataset <https://arrow.apache.org/docs/python/dataset.html> documentation
>>>>> notes 'optionally parallel reading.' Do you know how I can control this? Or
>>>>> perhaps control the number of concurrent connections. Or has this got
>>>>> nothing to do with the arrow and sits purley on the Azure side? I have
>>>>> increased the io thread count from the default 8 to 16 and saw no
>>>>> difference, but could still spin up more interactive windows to maximise
>>>>> bandwidth.
>>>>>
>>>>> *Question 2: Reading Filtered Data from Azure Blob Storage*
>>>>> Unfortunately I don't quite have a repeatable example here. However
>>>>> using the same data above, only this time I have each year as a feather
>>>>> file instead of a parquet file. I have uploaded this to my own Azure blob
>>>>> storage account.
>>>>> I am trying to read a subset of this data from the blob storage by
>>>>> selecting columns and filtering the data. The final result should be a
>>>>> dataframe that takes up around 240 mb of memory (I have tested this by
>>>>> working with the data locally). However when I run this by connecting to
>>>>> the Azure blob storage it takes over an hour to run and it's clear it's
>>>>> downloading a lot more data than I would have thought. Given the file
>>>>> formats are feather that supports random access I would have thought I
>>>>> would only have to download the 240 mb?
>>>>>
>>>>> Is there more going on in the background? Perhaps I am using this
>>>>> incorrectly?
>>>>>
>>>>> import adlfs
>>>>> import pyarrow.dataset as ds
>>>>>
>>>>> connection_string = ''
>>>>> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
>>>>>
>>>>> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
>>>>>
>>>>> df = (
>>>>>     ds_f
>>>>>     .scanner(
>>>>>         columns={ # Selections and Projections
>>>>>             'passengerCount': ds.field(('passengerCount'))*1000,
>>>>>             'tripDistance': ds.field(('tripDistance'))
>>>>>         },
>>>>>         filter=(ds.field('vendorID') == 1)
>>>>>     )
>>>>>     .to_table()
>>>>>     .to_pandas()
>>>>> )
>>>>>
>>>>> df.info()
>>>>>
>>>>> *Question 3: How is memory mapping being applied?*
>>>>> Does the Dataset API make use of memory mapping? Do I have the correct
>>>>> understanding that memory mapping is only intended for dealing with large
>>>>> data stored on a local file system. Where as data stored on a cloud file
>>>>> system in the feather format effectively cannot be memory mapped?
>>>>>
>>>>> *Question 4: Projections*
>>>>> I noticed in the scanner function when projecting a column I am unable
>>>>> to use any compute functions (I get a Type Error: only other expressions
>>>>> allowed as arguments) yet I am able to multiply this using standard python
>>>>> arithmetic.
>>>>>
>>>>> 'passengerCount': ds.field(('passengerCount'))*1000,
>>>>>
>>>>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
>>>>>
>>>>> Is this correct or am I to process this using an iterator via record
>>>>> batch
>>>>> <https://arrow.apache.org/docs/python/dataset.html#iterative-out-of-core-or-streaming-reads> to
>>>>> do this out of core? Is it actually even doing it out of core with " *1000
>>>>> ".
>>>>>
>>>>> Thanks for your help in advance. I have been following the Arrow
>>>>> project for the last two years but have only recently decided to dive into
>>>>> it in depth to explore it for various use cases. I am
>>>>> particularly interested in the out-of-core data processing and the
>>>>> interaction with cloud storages to retrieve only a selection of data from
>>>>> feather files. Hopefully at some point when I have enough knowledge I can
>>>>> contribute to this amazing project.
>>>>>
>>>>> Kind regards
>>>>> Nikhil Makan
>>>>>
>>>>

Re: [Python] - Dataset API - What's happening under the hood?

Posted by Jacob Quinn <qu...@gmail.com>.
Sorry for the late reply, but thought I'd chime in with a thought or two,
as I've had the chance to work on both the Arrow.jl Julia implementation as
well as recently working on a consistent cloud storage interface (for S3,
Azure, and planned GCP, also for Julia;
https://github.com/JuliaServices/CloudStore.jl).

First, to clarify, all cloud storage providers support "partial" reads in
the form of providing support for "ranged" HTTP requests (i.e. with a
"Range" header like: "Range: bytes 0-9"). So that means for a single
"object" in a cloud store, you could request specific byte ranges of that
single object to be returned.

How would that interact with stored data files? Well, for Arrow IPC/Feather
format, you could potentially do a series of these "range" requests to read
a single Feather file flatbuffer metadata, which contains the specific byte
offsets of columns within the data file. So in theory, it should be fairly
straightforward to apply
a kind of "column selection" operation where only specific columns are
actually downloaded from the cloud store, and it could be avoided to
download the entire file.

For other data formats? It's generally not as applicable since we don't
have this kind specific byte information of where certain rows/columns live
within a single object.

On the other hand, the parquet format supports a partitioning scheme that
*IS* more amenable to "partial reads", but in a slightly different way.
Instead of using HTTP Range requests, specific columns or row batches of
columns are stored as separate *objects* in the cloud store. And so by
doing a "list" type of operation
on all "objects" in the store, and reading overall metadata of the parquet
data, we could similarly do a "column selection" kind of operation by only
downloading specific *objects* from the cloud store that correspond to the
desired columns.

Hopefully that provides a little bit of clarity?

This is somewhat the overall vision that we're working towards with the
Julia implementation to hopefully provide really efficient interop with
cloud-stored data.

-Jacob Quinn


On Sun, Sep 18, 2022 at 7:47 PM Nikhil Makan <ni...@gmail.com>
wrote:

> Thanks Aldrin for the response on this.
>
> Question 1:
> For reference to anyone else who reads this, it appears adlfs does not
> support concurrent io and this is currently being developed.
> https://github.com/fsspec/adlfs/issues/268
>
> Question 2:
> Noted your points. I am using block blobs. If I am understanding you
> correctly are you suggesting just splitting the data up into separate
> blobs? This way if I filter the data it only downloads the blobs that are
> required? This would seem to only work if you know beforehand what the
> filter could be so you can split your data accordingly. However, if you
> wanted to return two columns of all the data I assume this would still
> result in all the blobs being downloaded. I also came across this
> https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-query-acceleration-how-to?tabs=python.
> However this is only for csv/json files.
>
> Are you aware of partial reading where we take advantage of the columnar
> format such as arrow/parquet being implemented in any other storages suchs
> as Google Cloud Storage or Amazon S3? I know PyArrow has native support for
> GCS and S3 however I ran this test example against S3 and no real
> improvements. Seems to be the same issue where the whole file is downloaded.
>
> import pyarrow.dataset as ds
> ds_f = ds.dataset(
> "s3://voltrondata-labs-datasets/nyc-taxi/year=2019/month=1")
> ds_f.head(10)
>
> df = (
>     ds_f
>     .scanner(
>         columns={ # Selections and Projections
>             'passengerCount': ds.field(('passengerCount')),
>         },
>     )
>     .to_table()
>     .to_pandas()
> )
> df.info()
>
> Question 3;
> Thanks, Noted.
>
> Question 4:
> Tried this:
> 'passengerCount': pc.multiply(ds.field(('passengerCount')),pa.scalar(1000
> ))
> Same issue -> Type Error: only other expressions allowed as arguments
>
> However it works with this:
> 'passengerCount': pc.multiply(ds.field(('passengerCount')),pc.scalar(1000
> ))
>
> This works as well as noted previosuly, so I assume the python operators
> are mapped across similar to what happens when you use the operators
> against a numpy or pandas series it just executes a np.multiply or pd.
> multiply in the background.
> 'passengerCount': ds.field(('passengerCount'))*1000,
>
> Kind regards
> Nikhil Makan
>
> On Fri, Sep 16, 2022 at 12:28 PM Aldrin <ak...@ucsc.edu> wrote:
>
>> (oh, sorry I misread `pa.scalar` as `pc.scalar`, so please try
>> `pyarrow.scalar` per the documentation)
>>
>> Aldrin Montana
>> Computer Science PhD Student
>> UC Santa Cruz
>>
>>
>> On Thu, Sep 15, 2022 at 5:26 PM Aldrin <ak...@ucsc.edu> wrote:
>>
>>> For Question 2:
>>> At a glance, I don't see anything in adlfs or azure that is able to do
>>> partial reads of a blob. If you're using block blobs, then likely you would
>>> want to store blocks of your file as separate blocks of a blob, and then
>>> you can do partial data transfers that way. I could be misunderstanding the
>>> SDKs or how Azure stores data, but my guess is that a whole blob is
>>> retrieved and then the local file is able to support partial, block-based
>>> reads as you expect from local filesystems. You may be able to double check
>>> how much data is being retrieved by looking at where adlfs is mounting your
>>> blob storage.
>>>
>>> For Question 3:
>>> you can memory map remote files, it's just that every page fault will be
>>> even more expensive than for local files. I am not sure how to tell the
>>> dataset API to do memory mapping, and I'm not sure how well that would work
>>> over adlfs.
>>>
>>> For Question 4:
>>> Can you try using `pc.scalar(1000)` as shown in the first code excerpt
>>> in [1]:
>>>
>>> >> x, y = pa.scalar(7.8), pa.scalar(9.3)
>>> >> pc.multiply(x, y)
>>> <pyarrow.DoubleScalar: 72.54>
>>>
>>> [1]:
>>> https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
>>>
>>> Aldrin Montana
>>> Computer Science PhD Student
>>> UC Santa Cruz
>>>
>>>
>>> On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <ni...@gmail.com>
>>> wrote:
>>>
>>>> Hi There,
>>>>
>>>> I have been experimenting with Tabular Datasets
>>>> <https://arrow.apache.org/docs/python/dataset.html> for data that can
>>>> be larger than memory and had a few questions related to what's going on
>>>> under the hood and how to work with it (I understand it is still
>>>> experimental).
>>>>
>>>> *Question 1: Reading Data from Azure Blob Storage*
>>>> Now I know the filesystems don't fully support this yet, but there is
>>>> an fsspec compatible library (adlfs) which is shown in the file system
>>>> example
>>>> <https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow> which
>>>> I have used. Example below with the nyc taxi dataset, where I am pulling
>>>> the whole dataset through and writing to disk to the feather format.
>>>>
>>>> import adlfs
>>>> import pyarrow.dataset as ds
>>>>
>>>> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
>>>>
>>>> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')
>>>>
>>>> scanner = dataset.scanner()
>>>> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')
>>>>
>>>> This could be something on the Azure side but I find I am being
>>>> bottlenecked on the download speed and have noticed if I spin up multiple
>>>> Python sessions (or in my case interactive windows) I can increase my
>>>> throughput. Hence I can download each year of the taxinyc dataset in
>>>> separate interactive windows and increase my bandwidth consumed. The tabular
>>>> dataset <https://arrow.apache.org/docs/python/dataset.html> documentation
>>>> notes 'optionally parallel reading.' Do you know how I can control this? Or
>>>> perhaps control the number of concurrent connections. Or has this got
>>>> nothing to do with the arrow and sits purley on the Azure side? I have
>>>> increased the io thread count from the default 8 to 16 and saw no
>>>> difference, but could still spin up more interactive windows to maximise
>>>> bandwidth.
>>>>
>>>> *Question 2: Reading Filtered Data from Azure Blob Storage*
>>>> Unfortunately I don't quite have a repeatable example here. However
>>>> using the same data above, only this time I have each year as a feather
>>>> file instead of a parquet file. I have uploaded this to my own Azure blob
>>>> storage account.
>>>> I am trying to read a subset of this data from the blob storage by
>>>> selecting columns and filtering the data. The final result should be a
>>>> dataframe that takes up around 240 mb of memory (I have tested this by
>>>> working with the data locally). However when I run this by connecting to
>>>> the Azure blob storage it takes over an hour to run and it's clear it's
>>>> downloading a lot more data than I would have thought. Given the file
>>>> formats are feather that supports random access I would have thought I
>>>> would only have to download the 240 mb?
>>>>
>>>> Is there more going on in the background? Perhaps I am using this
>>>> incorrectly?
>>>>
>>>> import adlfs
>>>> import pyarrow.dataset as ds
>>>>
>>>> connection_string = ''
>>>> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
>>>>
>>>> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
>>>>
>>>> df = (
>>>>     ds_f
>>>>     .scanner(
>>>>         columns={ # Selections and Projections
>>>>             'passengerCount': ds.field(('passengerCount'))*1000,
>>>>             'tripDistance': ds.field(('tripDistance'))
>>>>         },
>>>>         filter=(ds.field('vendorID') == 1)
>>>>     )
>>>>     .to_table()
>>>>     .to_pandas()
>>>> )
>>>>
>>>> df.info()
>>>>
>>>> *Question 3: How is memory mapping being applied?*
>>>> Does the Dataset API make use of memory mapping? Do I have the correct
>>>> understanding that memory mapping is only intended for dealing with large
>>>> data stored on a local file system. Where as data stored on a cloud file
>>>> system in the feather format effectively cannot be memory mapped?
>>>>
>>>> *Question 4: Projections*
>>>> I noticed in the scanner function when projecting a column I am unable
>>>> to use any compute functions (I get a Type Error: only other expressions
>>>> allowed as arguments) yet I am able to multiply this using standard python
>>>> arithmetic.
>>>>
>>>> 'passengerCount': ds.field(('passengerCount'))*1000,
>>>>
>>>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
>>>>
>>>> Is this correct or am I to process this using an iterator via record
>>>> batch
>>>> <https://arrow.apache.org/docs/python/dataset.html#iterative-out-of-core-or-streaming-reads> to
>>>> do this out of core? Is it actually even doing it out of core with " *1000
>>>> ".
>>>>
>>>> Thanks for your help in advance. I have been following the Arrow
>>>> project for the last two years but have only recently decided to dive into
>>>> it in depth to explore it for various use cases. I am
>>>> particularly interested in the out-of-core data processing and the
>>>> interaction with cloud storages to retrieve only a selection of data from
>>>> feather files. Hopefully at some point when I have enough knowledge I can
>>>> contribute to this amazing project.
>>>>
>>>> Kind regards
>>>> Nikhil Makan
>>>>
>>>

Re: [Python] - Dataset API - What's happening under the hood?

Posted by Nikhil Makan <ni...@gmail.com>.
Thanks Aldrin for the response on this.

Question 1:
For reference to anyone else who reads this, it appears adlfs does not
support concurrent io and this is currently being developed.
https://github.com/fsspec/adlfs/issues/268

Question 2:
Noted your points. I am using block blobs. If I am understanding you
correctly are you suggesting just splitting the data up into separate
blobs? This way if I filter the data it only downloads the blobs that are
required? This would seem to only work if you know beforehand what the
filter could be so you can split your data accordingly. However, if you
wanted to return two columns of all the data I assume this would still
result in all the blobs being downloaded. I also came across this
https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-query-acceleration-how-to?tabs=python.
However this is only for csv/json files.

Are you aware of partial reading where we take advantage of the columnar
format such as arrow/parquet being implemented in any other storages suchs
as Google Cloud Storage or Amazon S3? I know PyArrow has native support for
GCS and S3 however I ran this test example against S3 and no real
improvements. Seems to be the same issue where the whole file is downloaded.

import pyarrow.dataset as ds
ds_f = ds.dataset(
"s3://voltrondata-labs-datasets/nyc-taxi/year=2019/month=1")
ds_f.head(10)

df = (
    ds_f
    .scanner(
        columns={ # Selections and Projections
            'passengerCount': ds.field(('passengerCount')),
        },
    )
    .to_table()
    .to_pandas()
)
df.info()

Question 3;
Thanks, Noted.

Question 4:
Tried this:
'passengerCount': pc.multiply(ds.field(('passengerCount')),pa.scalar(1000))
Same issue -> Type Error: only other expressions allowed as arguments

However it works with this:
'passengerCount': pc.multiply(ds.field(('passengerCount')),pc.scalar(1000))

This works as well as noted previosuly, so I assume the python operators
are mapped across similar to what happens when you use the operators
against a numpy or pandas series it just executes a np.multiply or pd.
multiply in the background.
'passengerCount': ds.field(('passengerCount'))*1000,

Kind regards
Nikhil Makan

On Fri, Sep 16, 2022 at 12:28 PM Aldrin <ak...@ucsc.edu> wrote:

> (oh, sorry I misread `pa.scalar` as `pc.scalar`, so please try
> `pyarrow.scalar` per the documentation)
>
> Aldrin Montana
> Computer Science PhD Student
> UC Santa Cruz
>
>
> On Thu, Sep 15, 2022 at 5:26 PM Aldrin <ak...@ucsc.edu> wrote:
>
>> For Question 2:
>> At a glance, I don't see anything in adlfs or azure that is able to do
>> partial reads of a blob. If you're using block blobs, then likely you would
>> want to store blocks of your file as separate blocks of a blob, and then
>> you can do partial data transfers that way. I could be misunderstanding the
>> SDKs or how Azure stores data, but my guess is that a whole blob is
>> retrieved and then the local file is able to support partial, block-based
>> reads as you expect from local filesystems. You may be able to double check
>> how much data is being retrieved by looking at where adlfs is mounting your
>> blob storage.
>>
>> For Question 3:
>> you can memory map remote files, it's just that every page fault will be
>> even more expensive than for local files. I am not sure how to tell the
>> dataset API to do memory mapping, and I'm not sure how well that would work
>> over adlfs.
>>
>> For Question 4:
>> Can you try using `pc.scalar(1000)` as shown in the first code excerpt in
>> [1]:
>>
>> >> x, y = pa.scalar(7.8), pa.scalar(9.3)
>> >> pc.multiply(x, y)
>> <pyarrow.DoubleScalar: 72.54>
>>
>> [1]:
>> https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
>>
>> Aldrin Montana
>> Computer Science PhD Student
>> UC Santa Cruz
>>
>>
>> On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <ni...@gmail.com>
>> wrote:
>>
>>> Hi There,
>>>
>>> I have been experimenting with Tabular Datasets
>>> <https://arrow.apache.org/docs/python/dataset.html> for data that can
>>> be larger than memory and had a few questions related to what's going on
>>> under the hood and how to work with it (I understand it is still
>>> experimental).
>>>
>>> *Question 1: Reading Data from Azure Blob Storage*
>>> Now I know the filesystems don't fully support this yet, but there is an
>>> fsspec compatible library (adlfs) which is shown in the file system
>>> example
>>> <https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow> which
>>> I have used. Example below with the nyc taxi dataset, where I am pulling
>>> the whole dataset through and writing to disk to the feather format.
>>>
>>> import adlfs
>>> import pyarrow.dataset as ds
>>>
>>> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
>>>
>>> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')
>>>
>>> scanner = dataset.scanner()
>>> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')
>>>
>>> This could be something on the Azure side but I find I am being
>>> bottlenecked on the download speed and have noticed if I spin up multiple
>>> Python sessions (or in my case interactive windows) I can increase my
>>> throughput. Hence I can download each year of the taxinyc dataset in
>>> separate interactive windows and increase my bandwidth consumed. The tabular
>>> dataset <https://arrow.apache.org/docs/python/dataset.html> documentation
>>> notes 'optionally parallel reading.' Do you know how I can control this? Or
>>> perhaps control the number of concurrent connections. Or has this got
>>> nothing to do with the arrow and sits purley on the Azure side? I have
>>> increased the io thread count from the default 8 to 16 and saw no
>>> difference, but could still spin up more interactive windows to maximise
>>> bandwidth.
>>>
>>> *Question 2: Reading Filtered Data from Azure Blob Storage*
>>> Unfortunately I don't quite have a repeatable example here. However
>>> using the same data above, only this time I have each year as a feather
>>> file instead of a parquet file. I have uploaded this to my own Azure blob
>>> storage account.
>>> I am trying to read a subset of this data from the blob storage by
>>> selecting columns and filtering the data. The final result should be a
>>> dataframe that takes up around 240 mb of memory (I have tested this by
>>> working with the data locally). However when I run this by connecting to
>>> the Azure blob storage it takes over an hour to run and it's clear it's
>>> downloading a lot more data than I would have thought. Given the file
>>> formats are feather that supports random access I would have thought I
>>> would only have to download the 240 mb?
>>>
>>> Is there more going on in the background? Perhaps I am using this
>>> incorrectly?
>>>
>>> import adlfs
>>> import pyarrow.dataset as ds
>>>
>>> connection_string = ''
>>> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
>>>
>>> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
>>>
>>> df = (
>>>     ds_f
>>>     .scanner(
>>>         columns={ # Selections and Projections
>>>             'passengerCount': ds.field(('passengerCount'))*1000,
>>>             'tripDistance': ds.field(('tripDistance'))
>>>         },
>>>         filter=(ds.field('vendorID') == 1)
>>>     )
>>>     .to_table()
>>>     .to_pandas()
>>> )
>>>
>>> df.info()
>>>
>>> *Question 3: How is memory mapping being applied?*
>>> Does the Dataset API make use of memory mapping? Do I have the correct
>>> understanding that memory mapping is only intended for dealing with large
>>> data stored on a local file system. Where as data stored on a cloud file
>>> system in the feather format effectively cannot be memory mapped?
>>>
>>> *Question 4: Projections*
>>> I noticed in the scanner function when projecting a column I am unable
>>> to use any compute functions (I get a Type Error: only other expressions
>>> allowed as arguments) yet I am able to multiply this using standard python
>>> arithmetic.
>>>
>>> 'passengerCount': ds.field(('passengerCount'))*1000,
>>>
>>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
>>>
>>> Is this correct or am I to process this using an iterator via record
>>> batch
>>> <https://arrow.apache.org/docs/python/dataset.html#iterative-out-of-core-or-streaming-reads> to
>>> do this out of core? Is it actually even doing it out of core with " *1000
>>> ".
>>>
>>> Thanks for your help in advance. I have been following the Arrow project
>>> for the last two years but have only recently decided to dive into it in
>>> depth to explore it for various use cases. I am particularly interested in
>>> the out-of-core data processing and the interaction with cloud storages to
>>> retrieve only a selection of data from feather files. Hopefully at some
>>> point when I have enough knowledge I can contribute to this amazing project.
>>>
>>> Kind regards
>>> Nikhil Makan
>>>
>>

Re: [Python] - Dataset API - What's happening under the hood?

Posted by Aldrin <ak...@ucsc.edu>.
(oh, sorry I misread `pa.scalar` as `pc.scalar`, so please try
`pyarrow.scalar` per the documentation)

Aldrin Montana
Computer Science PhD Student
UC Santa Cruz


On Thu, Sep 15, 2022 at 5:26 PM Aldrin <ak...@ucsc.edu> wrote:

> For Question 2:
> At a glance, I don't see anything in adlfs or azure that is able to do
> partial reads of a blob. If you're using block blobs, then likely you would
> want to store blocks of your file as separate blocks of a blob, and then
> you can do partial data transfers that way. I could be misunderstanding the
> SDKs or how Azure stores data, but my guess is that a whole blob is
> retrieved and then the local file is able to support partial, block-based
> reads as you expect from local filesystems. You may be able to double check
> how much data is being retrieved by looking at where adlfs is mounting your
> blob storage.
>
> For Question 3:
> you can memory map remote files, it's just that every page fault will be
> even more expensive than for local files. I am not sure how to tell the
> dataset API to do memory mapping, and I'm not sure how well that would work
> over adlfs.
>
> For Question 4:
> Can you try using `pc.scalar(1000)` as shown in the first code excerpt in
> [1]:
>
> >> x, y = pa.scalar(7.8), pa.scalar(9.3)
> >> pc.multiply(x, y)
> <pyarrow.DoubleScalar: 72.54>
>
> [1]:
> https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
>
> Aldrin Montana
> Computer Science PhD Student
> UC Santa Cruz
>
>
> On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <ni...@gmail.com>
> wrote:
>
>> Hi There,
>>
>> I have been experimenting with Tabular Datasets
>> <https://arrow.apache.org/docs/python/dataset.html> for data that can be
>> larger than memory and had a few questions related to what's going on
>> under the hood and how to work with it (I understand it is still
>> experimental).
>>
>> *Question 1: Reading Data from Azure Blob Storage*
>> Now I know the filesystems don't fully support this yet, but there is an
>> fsspec compatible library (adlfs) which is shown in the file system
>> example
>> <https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow> which
>> I have used. Example below with the nyc taxi dataset, where I am pulling
>> the whole dataset through and writing to disk to the feather format.
>>
>> import adlfs
>> import pyarrow.dataset as ds
>>
>> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
>>
>> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')
>>
>> scanner = dataset.scanner()
>> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')
>>
>> This could be something on the Azure side but I find I am being
>> bottlenecked on the download speed and have noticed if I spin up multiple
>> Python sessions (or in my case interactive windows) I can increase my
>> throughput. Hence I can download each year of the taxinyc dataset in
>> separate interactive windows and increase my bandwidth consumed. The tabular
>> dataset <https://arrow.apache.org/docs/python/dataset.html> documentation
>> notes 'optionally parallel reading.' Do you know how I can control this? Or
>> perhaps control the number of concurrent connections. Or has this got
>> nothing to do with the arrow and sits purley on the Azure side? I have
>> increased the io thread count from the default 8 to 16 and saw no
>> difference, but could still spin up more interactive windows to maximise
>> bandwidth.
>>
>> *Question 2: Reading Filtered Data from Azure Blob Storage*
>> Unfortunately I don't quite have a repeatable example here. However using
>> the same data above, only this time I have each year as a feather file
>> instead of a parquet file. I have uploaded this to my own Azure blob
>> storage account.
>> I am trying to read a subset of this data from the blob storage by
>> selecting columns and filtering the data. The final result should be a
>> dataframe that takes up around 240 mb of memory (I have tested this by
>> working with the data locally). However when I run this by connecting to
>> the Azure blob storage it takes over an hour to run and it's clear it's
>> downloading a lot more data than I would have thought. Given the file
>> formats are feather that supports random access I would have thought I
>> would only have to download the 240 mb?
>>
>> Is there more going on in the background? Perhaps I am using this
>> incorrectly?
>>
>> import adlfs
>> import pyarrow.dataset as ds
>>
>> connection_string = ''
>> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
>>
>> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
>>
>> df = (
>>     ds_f
>>     .scanner(
>>         columns={ # Selections and Projections
>>             'passengerCount': ds.field(('passengerCount'))*1000,
>>             'tripDistance': ds.field(('tripDistance'))
>>         },
>>         filter=(ds.field('vendorID') == 1)
>>     )
>>     .to_table()
>>     .to_pandas()
>> )
>>
>> df.info()
>>
>> *Question 3: How is memory mapping being applied?*
>> Does the Dataset API make use of memory mapping? Do I have the correct
>> understanding that memory mapping is only intended for dealing with large
>> data stored on a local file system. Where as data stored on a cloud file
>> system in the feather format effectively cannot be memory mapped?
>>
>> *Question 4: Projections*
>> I noticed in the scanner function when projecting a column I am unable to
>> use any compute functions (I get a Type Error: only other expressions
>> allowed as arguments) yet I am able to multiply this using standard python
>> arithmetic.
>>
>> 'passengerCount': ds.field(('passengerCount'))*1000,
>>
>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
>>
>> Is this correct or am I to process this using an iterator via record
>> batch
>> <https://arrow.apache.org/docs/python/dataset.html#iterative-out-of-core-or-streaming-reads> to
>> do this out of core? Is it actually even doing it out of core with " *1000
>> ".
>>
>> Thanks for your help in advance. I have been following the Arrow project
>> for the last two years but have only recently decided to dive into it in
>> depth to explore it for various use cases. I am particularly interested in
>> the out-of-core data processing and the interaction with cloud storages to
>> retrieve only a selection of data from feather files. Hopefully at some
>> point when I have enough knowledge I can contribute to this amazing project.
>>
>> Kind regards
>> Nikhil Makan
>>
>

Re: [Python] - Dataset API - What's happening under the hood?

Posted by Nikhil Makan <ni...@gmail.com>.
Apologies for the late reply on this. I just wanted to say thank you Weston
for taking the time to provide that detailed explanation on memory
mapping!  (You should turn that into a blog post ha!)

Kind regards
Nikhil Makan

On Wed, Sep 21, 2022 at 5:21 PM Weston Pace <we...@gmail.com> wrote:

> > The dataset API still makes use of multiple cores though correct?
>
> Yes.  It tries to use enough threads to ensure it is using the full
> processor.
>
> > How does this then relate to the filesystems interface and native
> support for HDFS, GCFS and S3. Do these exhibit the same issue?
>
> No, they should not.  I am not aware of the specifics of the Azure
> issue but I know we can handle concurrent reads on the native S3 and
> GCFS filesystems and I assume we can on HDFS but I have never set HDFS
> up myself.
>
> > Further to this are per my earlier discussions on this thread we are
> unable to do partial reads of a blob in Azure, I wanted to know if that is
> possible with any of the other three that have native support. i.e. can we
> filter the data downloaded from these instead of downloading everything and
> then filtering?
>
> There are two questions here:
>
> 1. Can we read part of a file from the filesystem?
>
> Yes, all filesystem implementations that I am aware of support this.
>
> 2. Can we reduce the amount of data read by using a filter?
>
> Parquet is the only format that has some form of this.  We can
> eliminate entire row groups from consideration if there is a pushdown
> filter and the row group statistics are informative enough.  All
> formats (including Azure) support some pushdown support through
> partitioning.  If the files are stored in a partitioned manner we can
> possible eliminate entire directories from consideration with a
> pushdown filter (e.g. if the filter is year==2004 then we can
> eliminate the directory `/year=2003/month=July`)
>
> There is more we could do here, both with parquet (page-level indices)
> and IPC (e.g. arrow/feather files which have no statistics at the
> moment).  We just need someone that has the time and energy to
> implement it.
>
> > I don't think I quite follow this. Happy to be pointed to some
> documentation to read more on this by the way.
>
> I have yet to find a good guide that explains this so I'll make a
> brief attempt at clarification.
>
> 1. If you read in a buffer of data from the disk, and then you discard
> that buffer (e.g. delete it), then that physical memory can be
> returned to the OS (barring fragmentation) regardless of how it was
> read in (e.g. memory mapping or regular file).
> 2. If you read in a buffer of data from the disk, and you do not
> discard that buffer, then that physical memory cannot be returned to
> the OS unless it is swapped out.  This is true regardless of how it
> was read in.
>
> A. Normal reads
>
> A normal read will first copy the buffer from the disk to the kernel
> page cache.  The caller controls exactly when this I/O occurs because
> it will be during the call to `read()`.  The kernel will then perform
> a memcpy from the kernel page cache to a user-space caller-provided
> buffer that the caller has allocated.
>
> ```
> // Caller provides the buffer
> void* my_buffer = malloc(100);
> // Read happens here, can control exactly when it happens
> read(fp, my_buffer, 100);
> ```
>
> B. Memory mapped reads
>
> A memory mapped read will first copy the buffer from the disk to the
> kernel page cache.  The caller doesn't have much control over when
> this I/O occurs because it will happen whenever the kernel decides it
> needs the page in the page cache to be populated (usually when the
> user first tries to access the data and a page fault occurs).  It will
> then give the user a pointer directly into the kernel page cache.
>
> // Buffer provided by the kernel, no read happens here
> void* my_buffer = mmap(...);
> // ...
> // ...
> // This line probably triggers a page fault and blocks while the data
> is read from the disk.
> int x = ((int*)my_buffer)[0];
>
> > I thought the basic idea behind memory mapping is that the data
> structure has the same representation on disk as it does in memory
> therefore allowing it to not consume additional memory when reading it
>
> No.  We cannot, for example, do arithmetic on data that is on the
> disk.  The CPU requires that data first be brought into RAM.
>
> > So would the dataset API process multiple files potentially quicker
> without memory mapping.
>
> Probably not.  If your data happened to already be fully in the kernel
> page cache then yes, the dataset API would probably process the file
> slightly faster.  However, this would typically only be true if your
> entire dataset (or at least the working set you are dealing with) is
> smaller than your physical RAM and has been accessed recently.  If the
> data is not already in the kernel page cache then memory mapping will
> probably have a negative effect.  This is because the page faults come
> at unexpected times and can block the process at times we don't expect
> it to.
>
> > Also correct me if I am wrong, but memory mapping is related to the ipc
> format only, formats such as parquet cannot take advantage of this?
>
> Any format can use memory mapped I/O.
>
> SO....why bother?
>
> Memory mapping is more typically used for IPC.  In particular, it can
> be used to perform true zero-copy IPC.  This IPC would only be true
> zero copy with the IPC format.
>
> Process A memory maps a file.
> Process A populates that region of memory with a table it generates in
> some way.
> Process A sends a control signal to process B that the table is ready.
> Process B memory maps the same file (we know it will be in the kernel
> page cache because we just used this RAM to write to it).
> Process B operates on the data in some way.
>
> On Tue, Sep 20, 2022 at 4:46 PM Nikhil Makan <ni...@gmail.com>
> wrote:
> >
> > Hi Weston, thanks for the response!
> >
> > > I would say that this is always a problem.  In the datasets API the
> > goal is to maximize the resource usage within a single process.  Now,
> > it may be a known or expected problem :)
> >
> > The dataset API still makes use of multiple cores though correct?
> > How does this then relate to the filesystems interface and native
> support for HDFS, GCFS and S3. Do these exhibit the same issue? Further to
> this are per my earlier discussions on this thread we are unable to do
> partial reads of a blob in Azure, I wanted to know if that is possible with
> any of the other three that have native support. i.e. can we filter the
> data downloaded from these instead of downloading everything and then
> filtering?
> >
> > > I think the benefits of memory mapping are rather subtle and often
> > misleading.  Datasets can make use of memory mapping for local
> > filesystems.  Doing so will, at best, have a slight performance
> > benefit (avoiding a memcpy) but would most likely decrease performance
> > (by introducing I/O where it is not expected) and it will have no
> > effect whatsoever on the amount of RAM used.
> >
> > I don't think I quite follow this. Happy to be pointed to some
> documentation to read more on this by the way. I thought the basic idea
> behind memory mapping is that the data structure has the same
> representation on disk as it does in memory therefore allowing it to not
> consume additional memory when reading it, which is typical with normal I/O
> operations with reading files. So would the dataset API process multiple
> files potentially quicker without memory mapping. Also correct me if I am
> wrong, but memory mapping is related to the ipc format only, formats such
> as parquet cannot take advantage of this?
> >
> > Kind regards
> > Nikhil Makan
> >
> >
> > On Tue, Sep 20, 2022 at 5:12 AM Weston Pace <we...@gmail.com>
> wrote:
> >>
> >> Sorry for the slow reply.
> >>
> >> > This could be something on the Azure side but I find I am being
> bottlenecked on the download speed and have noticed if I spin up multiple
> Python sessions (or in my case interactive windows) I can increase my
> throughput. Hence I can download each year of the taxinyc dataset in
> separate interactive windows and increase my bandwidth consumed.
> >>
> >> I would say that this is always a problem.  In the datasets API the
> >> goal is to maximize the resource usage within a single process.  Now,
> >> it may be a known or expected problem :)
> >>
> >> > Does the Dataset API make use of memory mapping? Do I have the
> correct understanding that memory mapping is only intended for dealing with
> large data stored on a local file system. Where as data stored on a cloud
> file system in the feather format effectively cannot be memory mapped?
> >>
> >> I think the benefits of memory mapping are rather subtle and often
> >> misleading.  Datasets can make use of memory mapping for local
> >> filesystems.  Doing so will, at best, have a slight performance
> >> benefit (avoiding a memcpy) but would most likely decrease performance
> >> (by introducing I/O where it is not expected) and it will have no
> >> effect whatsoever on the amount of RAM used.
> >>
> >> > This works as well as noted previosuly, so I assume the python
> operators are mapped across similar to what happens when you use the
> operators against a numpy or pandas series it just executes a np.multiply
> or pd. multiply in the background.
> >>
> >> Yes.  However the functions that get mapped can sometimes be
> >> surprising.  Specifically, logical operations map to the _kleene
> >> variation and arithmetic maps to the _checked variation.  You can find
> >> the implementation at [1].  For multiplication this boils down to:
> >>
> >> ```
> >> @staticmethod
> >> cdef Expression _expr_or_scalar(object expr):
> >>     if isinstance(expr, Expression):
> >>         return (<Expression> expr)
> >>     return (<Expression> Expression._scalar(expr))
> >>
> >> ...
> >>
> >> def __mul__(Expression self, other):
> >>     other = Expression._expr_or_scalar(other)
> >>     return Expression._call("multiply_checked", [self, other])
> >> ```
> >>
> >>
> >> On Mon, Sep 19, 2022 at 12:52 AM Jacek Pliszka <ja...@gmail.com>
> wrote:
> >> >
> >> > Re 2.   In Python Azure SDK there is logic for partial blob read:
> >> >
> >> >
> https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#azure-storage-blob-blobclient-query-blob
> >> >
> >> > However I was unable to use it as it does not support parquet files
> >> > with decimal columns and these are the ones I have.
> >> >
> >> > BR
> >> >
> >> > J
> >> >
> >> > pt., 16 wrz 2022 o 02:26 Aldrin <ak...@ucsc.edu> napisał(a):
> >> > >
> >> > > For Question 2:
> >> > > At a glance, I don't see anything in adlfs or azure that is able to
> do partial reads of a blob. If you're using block blobs, then likely you
> would want to store blocks of your file as separate blocks of a blob, and
> then you can do partial data transfers that way. I could be
> misunderstanding the SDKs or how Azure stores data, but my guess is that a
> whole blob is retrieved and then the local file is able to support partial,
> block-based reads as you expect from local filesystems. You may be able to
> double check how much data is being retrieved by looking at where adlfs is
> mounting your blob storage.
> >> > >
> >> > > For Question 3:
> >> > > you can memory map remote files, it's just that every page fault
> will be even more expensive than for local files. I am not sure how to tell
> the dataset API to do memory mapping, and I'm not sure how well that would
> work over adlfs.
> >> > >
> >> > > For Question 4:
> >> > > Can you try using `pc.scalar(1000)` as shown in the first code
> excerpt in [1]:
> >> > >
> >> > > >> x, y = pa.scalar(7.8), pa.scalar(9.3)
> >> > > >> pc.multiply(x, y)
> >> > > <pyarrow.DoubleScalar: 72.54>
> >> > >
> >> > > [1]:
> https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
> >> > >
> >> > > Aldrin Montana
> >> > > Computer Science PhD Student
> >> > > UC Santa Cruz
> >> > >
> >> > >
> >> > > On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <
> nikhilmakan02@gmail.com> wrote:
> >> > >>
> >> > >> Hi There,
> >> > >>
> >> > >> I have been experimenting with Tabular Datasets for data that can
> be larger than memory and had a few questions related to what's going on
> under the hood and how to work with it (I understand it is still
> experimental).
> >> > >>
> >> > >> Question 1: Reading Data from Azure Blob Storage
> >> > >> Now I know the filesystems don't fully support this yet, but there
> is an fsspec compatible library (adlfs) which is shown in the file system
> example which I have used. Example below with the nyc taxi dataset, where I
> am pulling the whole dataset through and writing to disk to the feather
> format.
> >> > >>
> >> > >> import adlfs
> >> > >> import pyarrow.dataset as ds
> >> > >>
> >> > >> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
> >> > >>
> >> > >> dataset = ds.dataset('nyctlc/green/', filesystem=fs,
> format='parquet')
> >> > >>
> >> > >> scanner = dataset.scanner()
> >> > >> ds.write_dataset(scanner, f'taxinyc/green/feather/',
> format='feather')
> >> > >>
> >> > >> This could be something on the Azure side but I find I am being
> bottlenecked on the download speed and have noticed if I spin up multiple
> Python sessions (or in my case interactive windows) I can increase my
> throughput. Hence I can download each year of the taxinyc dataset in
> separate interactive windows and increase my bandwidth consumed. The
> tabular dataset documentation notes 'optionally parallel reading.' Do you
> know how I can control this? Or perhaps control the number of concurrent
> connections. Or has this got nothing to do with the arrow and sits purley
> on the Azure side? I have increased the io thread count from the default 8
> to 16 and saw no difference, but could still spin up more interactive
> windows to maximise bandwidth.
> >> > >>
> >> > >> Question 2: Reading Filtered Data from Azure Blob Storage
> >> > >> Unfortunately I don't quite have a repeatable example here.
> However using the same data above, only this time I have each year as a
> feather file instead of a parquet file. I have uploaded this to my own
> Azure blob storage account.
> >> > >> I am trying to read a subset of this data from the blob storage by
> selecting columns and filtering the data. The final result should be a
> dataframe that takes up around 240 mb of memory (I have tested this by
> working with the data locally). However when I run this by connecting to
> the Azure blob storage it takes over an hour to run and it's clear it's
> downloading a lot more data than I would have thought. Given the file
> formats are feather that supports random access I would have thought I
> would only have to download the 240 mb?
> >> > >>
> >> > >> Is there more going on in the background? Perhaps I am using this
> incorrectly?
> >> > >>
> >> > >> import adlfs
> >> > >> import pyarrow.dataset as ds
> >> > >>
> >> > >> connection_string = ''
> >> > >> fs =
> adlfs.AzureBlobFileSystem(connection_string=connection_string,)
> >> > >>
> >> > >> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
> >> > >>
> >> > >> df = (
> >> > >>     ds_f
> >> > >>     .scanner(
> >> > >>         columns={ # Selections and Projections
> >> > >>             'passengerCount': ds.field(('passengerCount'))*1000,
> >> > >>             'tripDistance': ds.field(('tripDistance'))
> >> > >>         },
> >> > >>         filter=(ds.field('vendorID') == 1)
> >> > >>     )
> >> > >>     .to_table()
> >> > >>     .to_pandas()
> >> > >> )
> >> > >>
> >> > >> df.info()
> >> > >>
> >> > >> Question 3: How is memory mapping being applied?
> >> > >> Does the Dataset API make use of memory mapping? Do I have the
> correct understanding that memory mapping is only intended for dealing with
> large data stored on a local file system. Where as data stored on a cloud
> file system in the feather format effectively cannot be memory mapped?
> >> > >>
> >> > >> Question 4: Projections
> >> > >> I noticed in the scanner function when projecting a column I am
> unable to use any compute functions (I get a Type Error: only other
> expressions allowed as arguments) yet I am able to multiply this using
> standard python arithmetic.
> >> > >>
> >> > >> 'passengerCount': ds.field(('passengerCount'))*1000,
> >> > >>
> >> > >> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
> >> > >>
> >> > >> Is this correct or am I to process this using an iterator via
> record batch to do this out of core? Is it actually even doing it out of
> core with " *1000 ".
> >> > >>
> >> > >> Thanks for your help in advance. I have been following the Arrow
> project for the last two years but have only recently decided to dive into
> it in depth to explore it for various use cases. I am particularly
> interested in the out-of-core data processing and the interaction with
> cloud storages to retrieve only a selection of data from feather files.
> Hopefully at some point when I have enough knowledge I can contribute to
> this amazing project.
> >> > >>
> >> > >> Kind regards
> >> > >> Nikhil Makan
>

Re: [Python] - Dataset API - What's happening under the hood?

Posted by Weston Pace <we...@gmail.com>.
> The dataset API still makes use of multiple cores though correct?

Yes.  It tries to use enough threads to ensure it is using the full processor.

> How does this then relate to the filesystems interface and native support for HDFS, GCFS and S3. Do these exhibit the same issue?

No, they should not.  I am not aware of the specifics of the Azure
issue but I know we can handle concurrent reads on the native S3 and
GCFS filesystems and I assume we can on HDFS but I have never set HDFS
up myself.

> Further to this are per my earlier discussions on this thread we are unable to do partial reads of a blob in Azure, I wanted to know if that is possible with any of the other three that have native support. i.e. can we filter the data downloaded from these instead of downloading everything and then filtering?

There are two questions here:

1. Can we read part of a file from the filesystem?

Yes, all filesystem implementations that I am aware of support this.

2. Can we reduce the amount of data read by using a filter?

Parquet is the only format that has some form of this.  We can
eliminate entire row groups from consideration if there is a pushdown
filter and the row group statistics are informative enough.  All
formats (including Azure) support some pushdown support through
partitioning.  If the files are stored in a partitioned manner we can
possible eliminate entire directories from consideration with a
pushdown filter (e.g. if the filter is year==2004 then we can
eliminate the directory `/year=2003/month=July`)

There is more we could do here, both with parquet (page-level indices)
and IPC (e.g. arrow/feather files which have no statistics at the
moment).  We just need someone that has the time and energy to
implement it.

> I don't think I quite follow this. Happy to be pointed to some documentation to read more on this by the way.

I have yet to find a good guide that explains this so I'll make a
brief attempt at clarification.

1. If you read in a buffer of data from the disk, and then you discard
that buffer (e.g. delete it), then that physical memory can be
returned to the OS (barring fragmentation) regardless of how it was
read in (e.g. memory mapping or regular file).
2. If you read in a buffer of data from the disk, and you do not
discard that buffer, then that physical memory cannot be returned to
the OS unless it is swapped out.  This is true regardless of how it
was read in.

A. Normal reads

A normal read will first copy the buffer from the disk to the kernel
page cache.  The caller controls exactly when this I/O occurs because
it will be during the call to `read()`.  The kernel will then perform
a memcpy from the kernel page cache to a user-space caller-provided
buffer that the caller has allocated.

```
// Caller provides the buffer
void* my_buffer = malloc(100);
// Read happens here, can control exactly when it happens
read(fp, my_buffer, 100);
```

B. Memory mapped reads

A memory mapped read will first copy the buffer from the disk to the
kernel page cache.  The caller doesn't have much control over when
this I/O occurs because it will happen whenever the kernel decides it
needs the page in the page cache to be populated (usually when the
user first tries to access the data and a page fault occurs).  It will
then give the user a pointer directly into the kernel page cache.

// Buffer provided by the kernel, no read happens here
void* my_buffer = mmap(...);
// ...
// ...
// This line probably triggers a page fault and blocks while the data
is read from the disk.
int x = ((int*)my_buffer)[0];

> I thought the basic idea behind memory mapping is that the data structure has the same representation on disk as it does in memory therefore allowing it to not consume additional memory when reading it

No.  We cannot, for example, do arithmetic on data that is on the
disk.  The CPU requires that data first be brought into RAM.

> So would the dataset API process multiple files potentially quicker without memory mapping.

Probably not.  If your data happened to already be fully in the kernel
page cache then yes, the dataset API would probably process the file
slightly faster.  However, this would typically only be true if your
entire dataset (or at least the working set you are dealing with) is
smaller than your physical RAM and has been accessed recently.  If the
data is not already in the kernel page cache then memory mapping will
probably have a negative effect.  This is because the page faults come
at unexpected times and can block the process at times we don't expect
it to.

> Also correct me if I am wrong, but memory mapping is related to the ipc format only, formats such as parquet cannot take advantage of this?

Any format can use memory mapped I/O.

SO....why bother?

Memory mapping is more typically used for IPC.  In particular, it can
be used to perform true zero-copy IPC.  This IPC would only be true
zero copy with the IPC format.

Process A memory maps a file.
Process A populates that region of memory with a table it generates in some way.
Process A sends a control signal to process B that the table is ready.
Process B memory maps the same file (we know it will be in the kernel
page cache because we just used this RAM to write to it).
Process B operates on the data in some way.

On Tue, Sep 20, 2022 at 4:46 PM Nikhil Makan <ni...@gmail.com> wrote:
>
> Hi Weston, thanks for the response!
>
> > I would say that this is always a problem.  In the datasets API the
> goal is to maximize the resource usage within a single process.  Now,
> it may be a known or expected problem :)
>
> The dataset API still makes use of multiple cores though correct?
> How does this then relate to the filesystems interface and native support for HDFS, GCFS and S3. Do these exhibit the same issue? Further to this are per my earlier discussions on this thread we are unable to do partial reads of a blob in Azure, I wanted to know if that is possible with any of the other three that have native support. i.e. can we filter the data downloaded from these instead of downloading everything and then filtering?
>
> > I think the benefits of memory mapping are rather subtle and often
> misleading.  Datasets can make use of memory mapping for local
> filesystems.  Doing so will, at best, have a slight performance
> benefit (avoiding a memcpy) but would most likely decrease performance
> (by introducing I/O where it is not expected) and it will have no
> effect whatsoever on the amount of RAM used.
>
> I don't think I quite follow this. Happy to be pointed to some documentation to read more on this by the way. I thought the basic idea behind memory mapping is that the data structure has the same representation on disk as it does in memory therefore allowing it to not consume additional memory when reading it, which is typical with normal I/O operations with reading files. So would the dataset API process multiple files potentially quicker without memory mapping. Also correct me if I am wrong, but memory mapping is related to the ipc format only, formats such as parquet cannot take advantage of this?
>
> Kind regards
> Nikhil Makan
>
>
> On Tue, Sep 20, 2022 at 5:12 AM Weston Pace <we...@gmail.com> wrote:
>>
>> Sorry for the slow reply.
>>
>> > This could be something on the Azure side but I find I am being bottlenecked on the download speed and have noticed if I spin up multiple Python sessions (or in my case interactive windows) I can increase my throughput. Hence I can download each year of the taxinyc dataset in separate interactive windows and increase my bandwidth consumed.
>>
>> I would say that this is always a problem.  In the datasets API the
>> goal is to maximize the resource usage within a single process.  Now,
>> it may be a known or expected problem :)
>>
>> > Does the Dataset API make use of memory mapping? Do I have the correct understanding that memory mapping is only intended for dealing with large data stored on a local file system. Where as data stored on a cloud file system in the feather format effectively cannot be memory mapped?
>>
>> I think the benefits of memory mapping are rather subtle and often
>> misleading.  Datasets can make use of memory mapping for local
>> filesystems.  Doing so will, at best, have a slight performance
>> benefit (avoiding a memcpy) but would most likely decrease performance
>> (by introducing I/O where it is not expected) and it will have no
>> effect whatsoever on the amount of RAM used.
>>
>> > This works as well as noted previosuly, so I assume the python operators are mapped across similar to what happens when you use the operators against a numpy or pandas series it just executes a np.multiply or pd. multiply in the background.
>>
>> Yes.  However the functions that get mapped can sometimes be
>> surprising.  Specifically, logical operations map to the _kleene
>> variation and arithmetic maps to the _checked variation.  You can find
>> the implementation at [1].  For multiplication this boils down to:
>>
>> ```
>> @staticmethod
>> cdef Expression _expr_or_scalar(object expr):
>>     if isinstance(expr, Expression):
>>         return (<Expression> expr)
>>     return (<Expression> Expression._scalar(expr))
>>
>> ...
>>
>> def __mul__(Expression self, other):
>>     other = Expression._expr_or_scalar(other)
>>     return Expression._call("multiply_checked", [self, other])
>> ```
>>
>>
>> On Mon, Sep 19, 2022 at 12:52 AM Jacek Pliszka <ja...@gmail.com> wrote:
>> >
>> > Re 2.   In Python Azure SDK there is logic for partial blob read:
>> >
>> > https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#azure-storage-blob-blobclient-query-blob
>> >
>> > However I was unable to use it as it does not support parquet files
>> > with decimal columns and these are the ones I have.
>> >
>> > BR
>> >
>> > J
>> >
>> > pt., 16 wrz 2022 o 02:26 Aldrin <ak...@ucsc.edu> napisał(a):
>> > >
>> > > For Question 2:
>> > > At a glance, I don't see anything in adlfs or azure that is able to do partial reads of a blob. If you're using block blobs, then likely you would want to store blocks of your file as separate blocks of a blob, and then you can do partial data transfers that way. I could be misunderstanding the SDKs or how Azure stores data, but my guess is that a whole blob is retrieved and then the local file is able to support partial, block-based reads as you expect from local filesystems. You may be able to double check how much data is being retrieved by looking at where adlfs is mounting your blob storage.
>> > >
>> > > For Question 3:
>> > > you can memory map remote files, it's just that every page fault will be even more expensive than for local files. I am not sure how to tell the dataset API to do memory mapping, and I'm not sure how well that would work over adlfs.
>> > >
>> > > For Question 4:
>> > > Can you try using `pc.scalar(1000)` as shown in the first code excerpt in [1]:
>> > >
>> > > >> x, y = pa.scalar(7.8), pa.scalar(9.3)
>> > > >> pc.multiply(x, y)
>> > > <pyarrow.DoubleScalar: 72.54>
>> > >
>> > > [1]: https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
>> > >
>> > > Aldrin Montana
>> > > Computer Science PhD Student
>> > > UC Santa Cruz
>> > >
>> > >
>> > > On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <ni...@gmail.com> wrote:
>> > >>
>> > >> Hi There,
>> > >>
>> > >> I have been experimenting with Tabular Datasets for data that can be larger than memory and had a few questions related to what's going on under the hood and how to work with it (I understand it is still experimental).
>> > >>
>> > >> Question 1: Reading Data from Azure Blob Storage
>> > >> Now I know the filesystems don't fully support this yet, but there is an fsspec compatible library (adlfs) which is shown in the file system example which I have used. Example below with the nyc taxi dataset, where I am pulling the whole dataset through and writing to disk to the feather format.
>> > >>
>> > >> import adlfs
>> > >> import pyarrow.dataset as ds
>> > >>
>> > >> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
>> > >>
>> > >> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')
>> > >>
>> > >> scanner = dataset.scanner()
>> > >> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')
>> > >>
>> > >> This could be something on the Azure side but I find I am being bottlenecked on the download speed and have noticed if I spin up multiple Python sessions (or in my case interactive windows) I can increase my throughput. Hence I can download each year of the taxinyc dataset in separate interactive windows and increase my bandwidth consumed. The tabular dataset documentation notes 'optionally parallel reading.' Do you know how I can control this? Or perhaps control the number of concurrent connections. Or has this got nothing to do with the arrow and sits purley on the Azure side? I have increased the io thread count from the default 8 to 16 and saw no difference, but could still spin up more interactive windows to maximise bandwidth.
>> > >>
>> > >> Question 2: Reading Filtered Data from Azure Blob Storage
>> > >> Unfortunately I don't quite have a repeatable example here. However using the same data above, only this time I have each year as a feather file instead of a parquet file. I have uploaded this to my own Azure blob storage account.
>> > >> I am trying to read a subset of this data from the blob storage by selecting columns and filtering the data. The final result should be a dataframe that takes up around 240 mb of memory (I have tested this by working with the data locally). However when I run this by connecting to the Azure blob storage it takes over an hour to run and it's clear it's downloading a lot more data than I would have thought. Given the file formats are feather that supports random access I would have thought I would only have to download the 240 mb?
>> > >>
>> > >> Is there more going on in the background? Perhaps I am using this incorrectly?
>> > >>
>> > >> import adlfs
>> > >> import pyarrow.dataset as ds
>> > >>
>> > >> connection_string = ''
>> > >> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
>> > >>
>> > >> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
>> > >>
>> > >> df = (
>> > >>     ds_f
>> > >>     .scanner(
>> > >>         columns={ # Selections and Projections
>> > >>             'passengerCount': ds.field(('passengerCount'))*1000,
>> > >>             'tripDistance': ds.field(('tripDistance'))
>> > >>         },
>> > >>         filter=(ds.field('vendorID') == 1)
>> > >>     )
>> > >>     .to_table()
>> > >>     .to_pandas()
>> > >> )
>> > >>
>> > >> df.info()
>> > >>
>> > >> Question 3: How is memory mapping being applied?
>> > >> Does the Dataset API make use of memory mapping? Do I have the correct understanding that memory mapping is only intended for dealing with large data stored on a local file system. Where as data stored on a cloud file system in the feather format effectively cannot be memory mapped?
>> > >>
>> > >> Question 4: Projections
>> > >> I noticed in the scanner function when projecting a column I am unable to use any compute functions (I get a Type Error: only other expressions allowed as arguments) yet I am able to multiply this using standard python arithmetic.
>> > >>
>> > >> 'passengerCount': ds.field(('passengerCount'))*1000,
>> > >>
>> > >> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
>> > >>
>> > >> Is this correct or am I to process this using an iterator via record batch to do this out of core? Is it actually even doing it out of core with " *1000 ".
>> > >>
>> > >> Thanks for your help in advance. I have been following the Arrow project for the last two years but have only recently decided to dive into it in depth to explore it for various use cases. I am particularly interested in the out-of-core data processing and the interaction with cloud storages to retrieve only a selection of data from feather files. Hopefully at some point when I have enough knowledge I can contribute to this amazing project.
>> > >>
>> > >> Kind regards
>> > >> Nikhil Makan

Re: [Python] - Dataset API - What's happening under the hood?

Posted by Nikhil Makan <ni...@gmail.com>.
Hi Weston, thanks for the response!

> I would say that this is always a problem.  In the datasets API the
goal is to maximize the resource usage within a single process.  Now,
it may be a known or expected problem :)

The dataset API still makes use of multiple cores though correct?
How does this then relate to the filesystems interface and native support
for HDFS, GCFS and S3. Do these exhibit the same issue? Further to this are
per my earlier discussions on this thread we are unable to do partial reads
of a blob in Azure, I wanted to know if that is possible with any of the
other three that have native support. i.e. can we filter the data
downloaded from these instead of downloading everything and then filtering?

> I think the benefits of memory mapping are rather subtle and often
misleading.  Datasets can make use of memory mapping for local
filesystems.  Doing so will, at best, have a slight performance
benefit (avoiding a memcpy) but would most likely decrease performance
(by introducing I/O where it is not expected) and it will have no
effect whatsoever on the amount of RAM used.

I don't think I quite follow this. Happy to be pointed to some
documentation to read more on this by the way. I thought the basic idea
behind memory mapping is that the data structure has the same
representation on disk as it does in memory therefore allowing it to not
consume additional memory when reading it, which is typical with normal I/O
operations with reading files. So would the dataset API process multiple
files potentially quicker without memory mapping. Also correct me if I am
wrong, but memory mapping is related to the ipc format only, formats such
as parquet cannot take advantage of this?

Kind regards
Nikhil Makan


On Tue, Sep 20, 2022 at 5:12 AM Weston Pace <we...@gmail.com> wrote:

> Sorry for the slow reply.
>
> > This could be something on the Azure side but I find I am being
> bottlenecked on the download speed and have noticed if I spin up multiple
> Python sessions (or in my case interactive windows) I can increase my
> throughput. Hence I can download each year of the taxinyc dataset in
> separate interactive windows and increase my bandwidth consumed.
>
> I would say that this is always a problem.  In the datasets API the
> goal is to maximize the resource usage within a single process.  Now,
> it may be a known or expected problem :)
>
> > Does the Dataset API make use of memory mapping? Do I have the correct
> understanding that memory mapping is only intended for dealing with large
> data stored on a local file system. Where as data stored on a cloud file
> system in the feather format effectively cannot be memory mapped?
>
> I think the benefits of memory mapping are rather subtle and often
> misleading.  Datasets can make use of memory mapping for local
> filesystems.  Doing so will, at best, have a slight performance
> benefit (avoiding a memcpy) but would most likely decrease performance
> (by introducing I/O where it is not expected) and it will have no
> effect whatsoever on the amount of RAM used.
>
> > This works as well as noted previosuly, so I assume the python operators
> are mapped across similar to what happens when you use the operators
> against a numpy or pandas series it just executes a np.multiply or pd.
> multiply in the background.
>
> Yes.  However the functions that get mapped can sometimes be
> surprising.  Specifically, logical operations map to the _kleene
> variation and arithmetic maps to the _checked variation.  You can find
> the implementation at [1].  For multiplication this boils down to:
>
> ```
> @staticmethod
> cdef Expression _expr_or_scalar(object expr):
>     if isinstance(expr, Expression):
>         return (<Expression> expr)
>     return (<Expression> Expression._scalar(expr))
>
> ...
>
> def __mul__(Expression self, other):
>     other = Expression._expr_or_scalar(other)
>     return Expression._call("multiply_checked", [self, other])
> ```
>
>
> On Mon, Sep 19, 2022 at 12:52 AM Jacek Pliszka <ja...@gmail.com>
> wrote:
> >
> > Re 2.   In Python Azure SDK there is logic for partial blob read:
> >
> >
> https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#azure-storage-blob-blobclient-query-blob
> >
> > However I was unable to use it as it does not support parquet files
> > with decimal columns and these are the ones I have.
> >
> > BR
> >
> > J
> >
> > pt., 16 wrz 2022 o 02:26 Aldrin <ak...@ucsc.edu> napisał(a):
> > >
> > > For Question 2:
> > > At a glance, I don't see anything in adlfs or azure that is able to do
> partial reads of a blob. If you're using block blobs, then likely you would
> want to store blocks of your file as separate blocks of a blob, and then
> you can do partial data transfers that way. I could be misunderstanding the
> SDKs or how Azure stores data, but my guess is that a whole blob is
> retrieved and then the local file is able to support partial, block-based
> reads as you expect from local filesystems. You may be able to double check
> how much data is being retrieved by looking at where adlfs is mounting your
> blob storage.
> > >
> > > For Question 3:
> > > you can memory map remote files, it's just that every page fault will
> be even more expensive than for local files. I am not sure how to tell the
> dataset API to do memory mapping, and I'm not sure how well that would work
> over adlfs.
> > >
> > > For Question 4:
> > > Can you try using `pc.scalar(1000)` as shown in the first code excerpt
> in [1]:
> > >
> > > >> x, y = pa.scalar(7.8), pa.scalar(9.3)
> > > >> pc.multiply(x, y)
> > > <pyarrow.DoubleScalar: 72.54>
> > >
> > > [1]:
> https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
> > >
> > > Aldrin Montana
> > > Computer Science PhD Student
> > > UC Santa Cruz
> > >
> > >
> > > On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <ni...@gmail.com>
> wrote:
> > >>
> > >> Hi There,
> > >>
> > >> I have been experimenting with Tabular Datasets for data that can be
> larger than memory and had a few questions related to what's going on under
> the hood and how to work with it (I understand it is still experimental).
> > >>
> > >> Question 1: Reading Data from Azure Blob Storage
> > >> Now I know the filesystems don't fully support this yet, but there is
> an fsspec compatible library (adlfs) which is shown in the file system
> example which I have used. Example below with the nyc taxi dataset, where I
> am pulling the whole dataset through and writing to disk to the feather
> format.
> > >>
> > >> import adlfs
> > >> import pyarrow.dataset as ds
> > >>
> > >> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
> > >>
> > >> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')
> > >>
> > >> scanner = dataset.scanner()
> > >> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')
> > >>
> > >> This could be something on the Azure side but I find I am being
> bottlenecked on the download speed and have noticed if I spin up multiple
> Python sessions (or in my case interactive windows) I can increase my
> throughput. Hence I can download each year of the taxinyc dataset in
> separate interactive windows and increase my bandwidth consumed. The
> tabular dataset documentation notes 'optionally parallel reading.' Do you
> know how I can control this? Or perhaps control the number of concurrent
> connections. Or has this got nothing to do with the arrow and sits purley
> on the Azure side? I have increased the io thread count from the default 8
> to 16 and saw no difference, but could still spin up more interactive
> windows to maximise bandwidth.
> > >>
> > >> Question 2: Reading Filtered Data from Azure Blob Storage
> > >> Unfortunately I don't quite have a repeatable example here. However
> using the same data above, only this time I have each year as a feather
> file instead of a parquet file. I have uploaded this to my own Azure blob
> storage account.
> > >> I am trying to read a subset of this data from the blob storage by
> selecting columns and filtering the data. The final result should be a
> dataframe that takes up around 240 mb of memory (I have tested this by
> working with the data locally). However when I run this by connecting to
> the Azure blob storage it takes over an hour to run and it's clear it's
> downloading a lot more data than I would have thought. Given the file
> formats are feather that supports random access I would have thought I
> would only have to download the 240 mb?
> > >>
> > >> Is there more going on in the background? Perhaps I am using this
> incorrectly?
> > >>
> > >> import adlfs
> > >> import pyarrow.dataset as ds
> > >>
> > >> connection_string = ''
> > >> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
> > >>
> > >> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
> > >>
> > >> df = (
> > >>     ds_f
> > >>     .scanner(
> > >>         columns={ # Selections and Projections
> > >>             'passengerCount': ds.field(('passengerCount'))*1000,
> > >>             'tripDistance': ds.field(('tripDistance'))
> > >>         },
> > >>         filter=(ds.field('vendorID') == 1)
> > >>     )
> > >>     .to_table()
> > >>     .to_pandas()
> > >> )
> > >>
> > >> df.info()
> > >>
> > >> Question 3: How is memory mapping being applied?
> > >> Does the Dataset API make use of memory mapping? Do I have the
> correct understanding that memory mapping is only intended for dealing with
> large data stored on a local file system. Where as data stored on a cloud
> file system in the feather format effectively cannot be memory mapped?
> > >>
> > >> Question 4: Projections
> > >> I noticed in the scanner function when projecting a column I am
> unable to use any compute functions (I get a Type Error: only other
> expressions allowed as arguments) yet I am able to multiply this using
> standard python arithmetic.
> > >>
> > >> 'passengerCount': ds.field(('passengerCount'))*1000,
> > >>
> > >> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
> > >>
> > >> Is this correct or am I to process this using an iterator via record
> batch to do this out of core? Is it actually even doing it out of core with
> " *1000 ".
> > >>
> > >> Thanks for your help in advance. I have been following the Arrow
> project for the last two years but have only recently decided to dive into
> it in depth to explore it for various use cases. I am particularly
> interested in the out-of-core data processing and the interaction with
> cloud storages to retrieve only a selection of data from feather files.
> Hopefully at some point when I have enough knowledge I can contribute to
> this amazing project.
> > >>
> > >> Kind regards
> > >> Nikhil Makan
>

Re: [Python] - Dataset API - What's happening under the hood?

Posted by Weston Pace <we...@gmail.com>.
Sorry for the slow reply.

> This could be something on the Azure side but I find I am being bottlenecked on the download speed and have noticed if I spin up multiple Python sessions (or in my case interactive windows) I can increase my throughput. Hence I can download each year of the taxinyc dataset in separate interactive windows and increase my bandwidth consumed.

I would say that this is always a problem.  In the datasets API the
goal is to maximize the resource usage within a single process.  Now,
it may be a known or expected problem :)

> Does the Dataset API make use of memory mapping? Do I have the correct understanding that memory mapping is only intended for dealing with large data stored on a local file system. Where as data stored on a cloud file system in the feather format effectively cannot be memory mapped?

I think the benefits of memory mapping are rather subtle and often
misleading.  Datasets can make use of memory mapping for local
filesystems.  Doing so will, at best, have a slight performance
benefit (avoiding a memcpy) but would most likely decrease performance
(by introducing I/O where it is not expected) and it will have no
effect whatsoever on the amount of RAM used.

> This works as well as noted previosuly, so I assume the python operators are mapped across similar to what happens when you use the operators against a numpy or pandas series it just executes a np.multiply or pd. multiply in the background.

Yes.  However the functions that get mapped can sometimes be
surprising.  Specifically, logical operations map to the _kleene
variation and arithmetic maps to the _checked variation.  You can find
the implementation at [1].  For multiplication this boils down to:

```
@staticmethod
cdef Expression _expr_or_scalar(object expr):
    if isinstance(expr, Expression):
        return (<Expression> expr)
    return (<Expression> Expression._scalar(expr))

...

def __mul__(Expression self, other):
    other = Expression._expr_or_scalar(other)
    return Expression._call("multiply_checked", [self, other])
```


On Mon, Sep 19, 2022 at 12:52 AM Jacek Pliszka <ja...@gmail.com> wrote:
>
> Re 2.   In Python Azure SDK there is logic for partial blob read:
>
> https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#azure-storage-blob-blobclient-query-blob
>
> However I was unable to use it as it does not support parquet files
> with decimal columns and these are the ones I have.
>
> BR
>
> J
>
> pt., 16 wrz 2022 o 02:26 Aldrin <ak...@ucsc.edu> napisał(a):
> >
> > For Question 2:
> > At a glance, I don't see anything in adlfs or azure that is able to do partial reads of a blob. If you're using block blobs, then likely you would want to store blocks of your file as separate blocks of a blob, and then you can do partial data transfers that way. I could be misunderstanding the SDKs or how Azure stores data, but my guess is that a whole blob is retrieved and then the local file is able to support partial, block-based reads as you expect from local filesystems. You may be able to double check how much data is being retrieved by looking at where adlfs is mounting your blob storage.
> >
> > For Question 3:
> > you can memory map remote files, it's just that every page fault will be even more expensive than for local files. I am not sure how to tell the dataset API to do memory mapping, and I'm not sure how well that would work over adlfs.
> >
> > For Question 4:
> > Can you try using `pc.scalar(1000)` as shown in the first code excerpt in [1]:
> >
> > >> x, y = pa.scalar(7.8), pa.scalar(9.3)
> > >> pc.multiply(x, y)
> > <pyarrow.DoubleScalar: 72.54>
> >
> > [1]: https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
> >
> > Aldrin Montana
> > Computer Science PhD Student
> > UC Santa Cruz
> >
> >
> > On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <ni...@gmail.com> wrote:
> >>
> >> Hi There,
> >>
> >> I have been experimenting with Tabular Datasets for data that can be larger than memory and had a few questions related to what's going on under the hood and how to work with it (I understand it is still experimental).
> >>
> >> Question 1: Reading Data from Azure Blob Storage
> >> Now I know the filesystems don't fully support this yet, but there is an fsspec compatible library (adlfs) which is shown in the file system example which I have used. Example below with the nyc taxi dataset, where I am pulling the whole dataset through and writing to disk to the feather format.
> >>
> >> import adlfs
> >> import pyarrow.dataset as ds
> >>
> >> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
> >>
> >> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')
> >>
> >> scanner = dataset.scanner()
> >> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')
> >>
> >> This could be something on the Azure side but I find I am being bottlenecked on the download speed and have noticed if I spin up multiple Python sessions (or in my case interactive windows) I can increase my throughput. Hence I can download each year of the taxinyc dataset in separate interactive windows and increase my bandwidth consumed. The tabular dataset documentation notes 'optionally parallel reading.' Do you know how I can control this? Or perhaps control the number of concurrent connections. Or has this got nothing to do with the arrow and sits purley on the Azure side? I have increased the io thread count from the default 8 to 16 and saw no difference, but could still spin up more interactive windows to maximise bandwidth.
> >>
> >> Question 2: Reading Filtered Data from Azure Blob Storage
> >> Unfortunately I don't quite have a repeatable example here. However using the same data above, only this time I have each year as a feather file instead of a parquet file. I have uploaded this to my own Azure blob storage account.
> >> I am trying to read a subset of this data from the blob storage by selecting columns and filtering the data. The final result should be a dataframe that takes up around 240 mb of memory (I have tested this by working with the data locally). However when I run this by connecting to the Azure blob storage it takes over an hour to run and it's clear it's downloading a lot more data than I would have thought. Given the file formats are feather that supports random access I would have thought I would only have to download the 240 mb?
> >>
> >> Is there more going on in the background? Perhaps I am using this incorrectly?
> >>
> >> import adlfs
> >> import pyarrow.dataset as ds
> >>
> >> connection_string = ''
> >> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
> >>
> >> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
> >>
> >> df = (
> >>     ds_f
> >>     .scanner(
> >>         columns={ # Selections and Projections
> >>             'passengerCount': ds.field(('passengerCount'))*1000,
> >>             'tripDistance': ds.field(('tripDistance'))
> >>         },
> >>         filter=(ds.field('vendorID') == 1)
> >>     )
> >>     .to_table()
> >>     .to_pandas()
> >> )
> >>
> >> df.info()
> >>
> >> Question 3: How is memory mapping being applied?
> >> Does the Dataset API make use of memory mapping? Do I have the correct understanding that memory mapping is only intended for dealing with large data stored on a local file system. Where as data stored on a cloud file system in the feather format effectively cannot be memory mapped?
> >>
> >> Question 4: Projections
> >> I noticed in the scanner function when projecting a column I am unable to use any compute functions (I get a Type Error: only other expressions allowed as arguments) yet I am able to multiply this using standard python arithmetic.
> >>
> >> 'passengerCount': ds.field(('passengerCount'))*1000,
> >>
> >> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
> >>
> >> Is this correct or am I to process this using an iterator via record batch to do this out of core? Is it actually even doing it out of core with " *1000 ".
> >>
> >> Thanks for your help in advance. I have been following the Arrow project for the last two years but have only recently decided to dive into it in depth to explore it for various use cases. I am particularly interested in the out-of-core data processing and the interaction with cloud storages to retrieve only a selection of data from feather files. Hopefully at some point when I have enough knowledge I can contribute to this amazing project.
> >>
> >> Kind regards
> >> Nikhil Makan

Re: [Python] - Dataset API - What's happening under the hood?

Posted by Jacek Pliszka <ja...@gmail.com>.
Re 2.   In Python Azure SDK there is logic for partial blob read:

https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#azure-storage-blob-blobclient-query-blob

However I was unable to use it as it does not support parquet files
with decimal columns and these are the ones I have.

BR

J

pt., 16 wrz 2022 o 02:26 Aldrin <ak...@ucsc.edu> napisał(a):
>
> For Question 2:
> At a glance, I don't see anything in adlfs or azure that is able to do partial reads of a blob. If you're using block blobs, then likely you would want to store blocks of your file as separate blocks of a blob, and then you can do partial data transfers that way. I could be misunderstanding the SDKs or how Azure stores data, but my guess is that a whole blob is retrieved and then the local file is able to support partial, block-based reads as you expect from local filesystems. You may be able to double check how much data is being retrieved by looking at where adlfs is mounting your blob storage.
>
> For Question 3:
> you can memory map remote files, it's just that every page fault will be even more expensive than for local files. I am not sure how to tell the dataset API to do memory mapping, and I'm not sure how well that would work over adlfs.
>
> For Question 4:
> Can you try using `pc.scalar(1000)` as shown in the first code excerpt in [1]:
>
> >> x, y = pa.scalar(7.8), pa.scalar(9.3)
> >> pc.multiply(x, y)
> <pyarrow.DoubleScalar: 72.54>
>
> [1]: https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
>
> Aldrin Montana
> Computer Science PhD Student
> UC Santa Cruz
>
>
> On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <ni...@gmail.com> wrote:
>>
>> Hi There,
>>
>> I have been experimenting with Tabular Datasets for data that can be larger than memory and had a few questions related to what's going on under the hood and how to work with it (I understand it is still experimental).
>>
>> Question 1: Reading Data from Azure Blob Storage
>> Now I know the filesystems don't fully support this yet, but there is an fsspec compatible library (adlfs) which is shown in the file system example which I have used. Example below with the nyc taxi dataset, where I am pulling the whole dataset through and writing to disk to the feather format.
>>
>> import adlfs
>> import pyarrow.dataset as ds
>>
>> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
>>
>> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')
>>
>> scanner = dataset.scanner()
>> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')
>>
>> This could be something on the Azure side but I find I am being bottlenecked on the download speed and have noticed if I spin up multiple Python sessions (or in my case interactive windows) I can increase my throughput. Hence I can download each year of the taxinyc dataset in separate interactive windows and increase my bandwidth consumed. The tabular dataset documentation notes 'optionally parallel reading.' Do you know how I can control this? Or perhaps control the number of concurrent connections. Or has this got nothing to do with the arrow and sits purley on the Azure side? I have increased the io thread count from the default 8 to 16 and saw no difference, but could still spin up more interactive windows to maximise bandwidth.
>>
>> Question 2: Reading Filtered Data from Azure Blob Storage
>> Unfortunately I don't quite have a repeatable example here. However using the same data above, only this time I have each year as a feather file instead of a parquet file. I have uploaded this to my own Azure blob storage account.
>> I am trying to read a subset of this data from the blob storage by selecting columns and filtering the data. The final result should be a dataframe that takes up around 240 mb of memory (I have tested this by working with the data locally). However when I run this by connecting to the Azure blob storage it takes over an hour to run and it's clear it's downloading a lot more data than I would have thought. Given the file formats are feather that supports random access I would have thought I would only have to download the 240 mb?
>>
>> Is there more going on in the background? Perhaps I am using this incorrectly?
>>
>> import adlfs
>> import pyarrow.dataset as ds
>>
>> connection_string = ''
>> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
>>
>> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
>>
>> df = (
>>     ds_f
>>     .scanner(
>>         columns={ # Selections and Projections
>>             'passengerCount': ds.field(('passengerCount'))*1000,
>>             'tripDistance': ds.field(('tripDistance'))
>>         },
>>         filter=(ds.field('vendorID') == 1)
>>     )
>>     .to_table()
>>     .to_pandas()
>> )
>>
>> df.info()
>>
>> Question 3: How is memory mapping being applied?
>> Does the Dataset API make use of memory mapping? Do I have the correct understanding that memory mapping is only intended for dealing with large data stored on a local file system. Where as data stored on a cloud file system in the feather format effectively cannot be memory mapped?
>>
>> Question 4: Projections
>> I noticed in the scanner function when projecting a column I am unable to use any compute functions (I get a Type Error: only other expressions allowed as arguments) yet I am able to multiply this using standard python arithmetic.
>>
>> 'passengerCount': ds.field(('passengerCount'))*1000,
>>
>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
>>
>> Is this correct or am I to process this using an iterator via record batch to do this out of core? Is it actually even doing it out of core with " *1000 ".
>>
>> Thanks for your help in advance. I have been following the Arrow project for the last two years but have only recently decided to dive into it in depth to explore it for various use cases. I am particularly interested in the out-of-core data processing and the interaction with cloud storages to retrieve only a selection of data from feather files. Hopefully at some point when I have enough knowledge I can contribute to this amazing project.
>>
>> Kind regards
>> Nikhil Makan

Re: [Python] - Dataset API - What's happening under the hood?

Posted by Aldrin <ak...@ucsc.edu>.
For Question 2:
At a glance, I don't see anything in adlfs or azure that is able to do
partial reads of a blob. If you're using block blobs, then likely you would
want to store blocks of your file as separate blocks of a blob, and then
you can do partial data transfers that way. I could be misunderstanding the
SDKs or how Azure stores data, but my guess is that a whole blob is
retrieved and then the local file is able to support partial, block-based
reads as you expect from local filesystems. You may be able to double check
how much data is being retrieved by looking at where adlfs is mounting your
blob storage.

For Question 3:
you can memory map remote files, it's just that every page fault will be
even more expensive than for local files. I am not sure how to tell the
dataset API to do memory mapping, and I'm not sure how well that would work
over adlfs.

For Question 4:
Can you try using `pc.scalar(1000)` as shown in the first code excerpt in
[1]:

>> x, y = pa.scalar(7.8), pa.scalar(9.3)
>> pc.multiply(x, y)
<pyarrow.DoubleScalar: 72.54>

[1]:
https://arrow.apache.org/docs/python/compute.html#standard-compute-functions

Aldrin Montana
Computer Science PhD Student
UC Santa Cruz


On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <ni...@gmail.com> wrote:

> Hi There,
>
> I have been experimenting with Tabular Datasets
> <https://arrow.apache.org/docs/python/dataset.html> for data that can be
> larger than memory and had a few questions related to what's going on
> under the hood and how to work with it (I understand it is still
> experimental).
>
> *Question 1: Reading Data from Azure Blob Storage*
> Now I know the filesystems don't fully support this yet, but there is an
> fsspec compatible library (adlfs) which is shown in the file system
> example
> <https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow> which
> I have used. Example below with the nyc taxi dataset, where I am pulling
> the whole dataset through and writing to disk to the feather format.
>
> import adlfs
> import pyarrow.dataset as ds
>
> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
>
> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet')
>
> scanner = dataset.scanner()
> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather')
>
> This could be something on the Azure side but I find I am being
> bottlenecked on the download speed and have noticed if I spin up multiple
> Python sessions (or in my case interactive windows) I can increase my
> throughput. Hence I can download each year of the taxinyc dataset in
> separate interactive windows and increase my bandwidth consumed. The tabular
> dataset <https://arrow.apache.org/docs/python/dataset.html> documentation
> notes 'optionally parallel reading.' Do you know how I can control this? Or
> perhaps control the number of concurrent connections. Or has this got
> nothing to do with the arrow and sits purley on the Azure side? I have
> increased the io thread count from the default 8 to 16 and saw no
> difference, but could still spin up more interactive windows to maximise
> bandwidth.
>
> *Question 2: Reading Filtered Data from Azure Blob Storage*
> Unfortunately I don't quite have a repeatable example here. However using
> the same data above, only this time I have each year as a feather file
> instead of a parquet file. I have uploaded this to my own Azure blob
> storage account.
> I am trying to read a subset of this data from the blob storage by
> selecting columns and filtering the data. The final result should be a
> dataframe that takes up around 240 mb of memory (I have tested this by
> working with the data locally). However when I run this by connecting to
> the Azure blob storage it takes over an hour to run and it's clear it's
> downloading a lot more data than I would have thought. Given the file
> formats are feather that supports random access I would have thought I
> would only have to download the 240 mb?
>
> Is there more going on in the background? Perhaps I am using this
> incorrectly?
>
> import adlfs
> import pyarrow.dataset as ds
>
> connection_string = ''
> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
>
> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
>
> df = (
>     ds_f
>     .scanner(
>         columns={ # Selections and Projections
>             'passengerCount': ds.field(('passengerCount'))*1000,
>             'tripDistance': ds.field(('tripDistance'))
>         },
>         filter=(ds.field('vendorID') == 1)
>     )
>     .to_table()
>     .to_pandas()
> )
>
> df.info()
>
> *Question 3: How is memory mapping being applied?*
> Does the Dataset API make use of memory mapping? Do I have the correct
> understanding that memory mapping is only intended for dealing with large
> data stored on a local file system. Where as data stored on a cloud file
> system in the feather format effectively cannot be memory mapped?
>
> *Question 4: Projections*
> I noticed in the scanner function when projecting a column I am unable to
> use any compute functions (I get a Type Error: only other expressions
> allowed as arguments) yet I am able to multiply this using standard python
> arithmetic.
>
> 'passengerCount': ds.field(('passengerCount'))*1000,
>
> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
>
> Is this correct or am I to process this using an iterator via record batch
> <https://arrow.apache.org/docs/python/dataset.html#iterative-out-of-core-or-streaming-reads> to
> do this out of core? Is it actually even doing it out of core with " *1000
> ".
>
> Thanks for your help in advance. I have been following the Arrow project
> for the last two years but have only recently decided to dive into it in
> depth to explore it for various use cases. I am particularly interested in
> the out-of-core data processing and the interaction with cloud storages to
> retrieve only a selection of data from feather files. Hopefully at some
> point when I have enough knowledge I can contribute to this amazing project.
>
> Kind regards
> Nikhil Makan
>