You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Nikhil Makan <ni...@gmail.com> on 2022/10/10 00:08:16 UTC

[Python] - Dataset- Write Table to Feather

Hi All,

I have a situation where I am running a lot of simulations in parallel
(100k+). The results of each simulation get written to an arrow file. A
result file contains around 8 columns and 1 row.

After all simulations have run I want to be able to merge all these files
up into a single arrow file.

   - To do this I have chosen to use the Dataset API which reads all the
   files (this takes around 12 mins)
   - I then call to_table() on the dataset to bring it into memory
   - Finally I write the table to an arrow file using
   feather.write_feather()

Unfortunately I do not have a reproducible example, but hopefully the
answer to this question won't require one.

The part I find strange is the size of the combined arrow file (187 mb) on
disk and time it takes to write and read that file (> 1min).

Alternatively I can convert the table to a pandas dataframe by calling
to_pandas() and then use to_feather() on the dataframe. The resulting file
on disk is now only 5.5 mb and naturally writes and opens in a flash.

I feel like this has something to do with partitions and how the table is
being structured coming from the dataset API that is then preserved when
writing to a file.

Does anyone know why this would be the case and how to achieve the same
outcome as done with the intermediate step by converting to pandas.

Kind regards
Nikhil Makan

Re: [Python] - Dataset- Write Table to Feather

Posted by Aldrin <ak...@ucsc.edu>.
>  The simulations are all running in parallel so writing to the same file
is challenging

How is each simulation managed? Typically you would launch each process, in
which case you can write to as many files as processes you launch at a time.

>  The raw results of each simulation gets read in using Pandas and written
to an arrow file after a simulation is complete

This is potentially another point where you can coalesce outputs. Perhaps
the process that reads simulation results can read many simulation results
instead of 1 so that the output arrow file is larger.

The last thing I can think of is that sometimes blob storage allows you to
group many writes together as "partial writes" to a single blob. This is
necessary because it's extremely common that you have a single named object
that must be spread across storage units (e.g. inodes or extents or data
pages etc.). This SO post [1] mentions blocks and block lists which could
be used to write each small file as a blob block which you then can create
many lists out of. The Azure Blob Storage documentation [2] also lists
"page blobs" and "append blobs". So, this third suggestion is to see if you
can use one of these approaches to address the overhead of listing all of
the files (which David mentions). It looks like both block blobs and append
blobs (the ones I think are most useful for you) have a limit of ~50,000
blocks/appends, so you would need some simple naming scheme to be sure you
don't overrun that limit. I'm not sure if it's possible, but hopefully you
can write them in such a way that the Dataset API (or something similar)
can combine each batch efficiently (depending on size of each batch and how
Arrow combines compressed batches, it may be better to not compress until
you combine the small files).

[1]: https://stackoverflow.com/a/61156040
[2]:
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api

Aldrin Montana
Computer Science PhD Student
UC Santa Cruz


On Mon, Oct 10, 2022 at 4:49 AM David Li <li...@apache.org> wrote:

> Which filesystem implementation are you using for Azure? I would guess
> that part of the bottleneck might even just be in _listing_ all the files.
> I believe some of the S3 filesystem implementations try to parallelize this
> step for that reason, but if all files lie in the same prefix, even that
> may not be possible. I am not sure if any of the Azure Blob Storage
> filesystem implementations do this. (Also, Arrow Dataset plans to - but
> does not yet support - pipelining listing/reading, for this reason.)
>
> Another solution may be to turn up the fragment readahead settings while
> reading, since you have so many small files. Unfortunately it appears this
> is only available in PyArrow starting in 10.0.0 [1].
>
> Compression is not helping you for these single-row files but I also would
> not think it's a major contributor to the runtime; I think you're mostly
> getting crushed under the overhead of listing, then reading (and opening
> HTTP connections for) 100k files. (And because the readers try to minimize
> I/O for larger files, they're generally making multiple requests per file,
> which in this case is hurting you even more.)
>
> The optimal chunk size is going to depend on your application. Chunk size
> is mostly controlled by how you are generating the data. You can think of
> it as the intra-file unit of parallelism in this case (though I would think
> the equivalent to the Spark example is to have multiple files, one per
> worker).
>
> [1]:
> https://arrow.apache.org/docs/dev/python/generated/pyarrow.dataset.Scanner.html?highlight=fragment_readahead
>
> On Sun, Oct 9, 2022, at 22:36, Cedric Yau wrote:
>
> If each file is just one row, then you might be better off writing the
> values out as a Tab Separate Values file.  Those can all be concatenated
> together.
>
> The data is also small enough to transmit via a queue.  I'm primarily AWS
> but Azure Queue Storage appears to be the right service:
> How to use Azure Queue Storage from Python | Microsoft Learn
> <https://learn.microsoft.com/en-us/azure/storage/queues/storage-python-how-to-use-queue-storage?tabs=python%2Cenvironment-variable-windows>
>
> This would allow you to gather results as they are ready instead of
> waiting until the end.
>
> Cedric
>
> On Sun, Oct 9, 2022 at 10:27 PM Nikhil Makan <ni...@gmail.com>
> wrote:
>
> That's it! Thanks David.
>
> to_table().column(0).num_chunks = to_table().num_rows, therefore
> combine_chunks() merged them all into one.
>
> However I need to unpack the finer details of this a bit more:
>
>    - Given I have over 100k simulations producing these 'single row'
>    files. What's the best way to handle this afterwards or is there a better
>    way to store this from the start. The simulations are all running in
>    parallel so writing to the same file is challenging and I don't want to go
>    down the route of implementing a database of some sort. Would storing them
>    as uncompressed arrow files improve performance when reading in with the
>    dataset API or is there another more efficient way to combine them?
>    - What is optimal for chunks? Naturally a chunk for each row is not
>    efficient. Leaning on some knowledge I have with Spark the idea is to
>    partition your data so it can be spread across the number of nodes in the
>    cluster, to few partitions meant an under utilised cluster. What should be
>    done with chunks and do we have control over setting this?
>
> Kind regards
> Nikhil Makan
>
>
> On Mon, Oct 10, 2022 at 2:21 PM David Li <li...@apache.org> wrote:
>
>
> I would guess that because the 187mb file is generated from 100,000+
> files, and each input file is one row, you are hitting basically a
> pathological case for Dataset/Arrow. Namely, the Dataset code isn't going
> to consolidate input batches together, and so each input batch (= 1 row) is
> making it into the output file. And there's some metadata per batch (=per
> row!), inflating the storage requirements, and completely inhibiting
> compression. (You're running LZ4 on each individual value in each row!)
>
> To check this, can you call combine_chunks() [1] after to_table, before
> write_feather? This will combine the record batches (well, technically,
> chunks of the ChunkedArrays) into a single record batch, and I would guess
> you'll end up with something similar to the to_pandas().to_feather() case
> (without bouncing through Pandas).
>
> You could also check this with to_table().column(0).num_chunks (I'd expect
> this would be equal to to_table().num_rows).
>
> [1]:
> https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.combine_chunks
>
> On Sun, Oct 9, 2022, at 21:12, Nikhil Makan wrote:
>
> Hi Will,
>
> For clarity the simulation files do get written to an Azure Blob Storage,
> however to simplify things I have not tried to read the data directly from
> the cloud storage. I have downloaded it first and then loaded it into a
> dataset locally (which takes 12 mins). The process to produce the arrow
> file for each simulation is done through pandas. The raw results of each
> simulation gets read in using Pandas and written to an arrow file after a
> simulation is complete. The 100 000+ files are therefore in the arrow
> format using LZ4 compression.
>
> The table retrieved from the dataset object is then written to an arrow
> file using feather.write_feather which again by default uses LZ4.
>
> Do you know if there is any way to inspect the two files or tables to get
> more information about them as I can't understand how I have two arrow
> files, one which is 187 mb the other 5.5mb however both with the same
> compression LZ4, schema, shape and nbytes when read in.
>
> I can even read in the 187mb arrow file and write it back to disk and it
> remains 187 mb, so there is definitely some property of the arrow table
> that I am not seeing. It is not necessarily a property of just the file.
>
> Kind regards
> Nikhil Makan
>
>
> On Mon, Oct 10, 2022 at 1:38 PM Will Jones <wi...@gmail.com>
> wrote:
>
> Hi Nikhil,
>
> To do this I have chosen to use the Dataset API which reads all the files
> (this takes around 12 mins)
>
>
> Given the number of files (100k+ right?) this does seem surprising.
> Especially if you are using a remote filesystem like Azure or S3. Perhaps
> you should consider having your application record the file paths of each
> simulation result; then the Datasets API doesn't need to spend time
> resolving all the file paths.
>
> The part I find strange is the size of the combined arrow file (187 mb) on
> disk and time it takes to write and read that file (> 1min).
>
>
> On the size, you should check which compression you are using. There are
> some code paths that write uncompressed data by default and some code paths
> that do. The Pandas to_feather() uses LZ4 by default; it's possible the
> other way you are writing isn't. See IPC write options [1].
>
> On the time to read, that seems very long for local, and even for remote
> (Azure?).
>
> Best,
>
> Will Jones
>
> [1]
> https://arrow.apache.org/docs/python/generated/pyarrow.ipc.IpcWriteOptions.html#pyarrow.ipc.IpcWriteOptions
>
>
> On Sun, Oct 9, 2022 at 5:08 PM Nikhil Makan <ni...@gmail.com>
> wrote:
>
> Hi All,
>
> I have a situation where I am running a lot of simulations in parallel
> (100k+). The results of each simulation get written to an arrow file. A
> result file contains around 8 columns and 1 row.
>
> After all simulations have run I want to be able to merge all these files
> up into a single arrow file.
>
>    - To do this I have chosen to use the Dataset API which reads all the
>    files (this takes around 12 mins)
>    - I then call to_table() on the dataset to bring it into memory
>    - Finally I write the table to an arrow file using
>    feather.write_feather()
>
> Unfortunately I do not have a reproducible example, but hopefully the
> answer to this question won't require one.
>
> The part I find strange is the size of the combined arrow file (187 mb) on
> disk and time it takes to write and read that file (> 1min).
>
> Alternatively I can convert the table to a pandas dataframe by calling
> to_pandas() and then use to_feather() on the dataframe. The resulting file
> on disk is now only 5.5 mb and naturally writes and opens in a flash.
>
> I feel like this has something to do with partitions and how the table is
> being structured coming from the dataset API that is then preserved when
> writing to a file.
>
> Does anyone know why this would be the case and how to achieve the same
> outcome as done with the intermediate step by converting to pandas.
>
> Kind regards
> Nikhil Makan
>
>
>
>
> --
> Cedric Yau
> cedricyau@gmail.com
>
>
>

Re: [Python] - Dataset- Write Table to Feather

Posted by David Li <li...@apache.org>.
Which filesystem implementation are you using for Azure? I would guess that part of the bottleneck might even just be in _listing_ all the files. I believe some of the S3 filesystem implementations try to parallelize this step for that reason, but if all files lie in the same prefix, even that may not be possible. I am not sure if any of the Azure Blob Storage filesystem implementations do this. (Also, Arrow Dataset plans to - but does not yet support - pipelining listing/reading, for this reason.)

Another solution may be to turn up the fragment readahead settings while reading, since you have so many small files. Unfortunately it appears this is only available in PyArrow starting in 10.0.0 [1].

Compression is not helping you for these single-row files but I also would not think it's a major contributor to the runtime; I think you're mostly getting crushed under the overhead of listing, then reading (and opening HTTP connections for) 100k files. (And because the readers try to minimize I/O for larger files, they're generally making multiple requests per file, which in this case is hurting you even more.)

The optimal chunk size is going to depend on your application. Chunk size is mostly controlled by how you are generating the data. You can think of it as the intra-file unit of parallelism in this case (though I would think the equivalent to the Spark example is to have multiple files, one per worker). 

[1]: https://arrow.apache.org/docs/dev/python/generated/pyarrow.dataset.Scanner.html?highlight=fragment_readahead

On Sun, Oct 9, 2022, at 22:36, Cedric Yau wrote:
> If each file is just one row, then you might be better off writing the values out as a Tab Separate Values file.  Those can all be concatenated together.
> 
> The data is also small enough to transmit via a queue.  I'm primarily AWS but Azure Queue Storage appears to be the right service:
> How to use Azure Queue Storage from Python | Microsoft Learn <https://learn.microsoft.com/en-us/azure/storage/queues/storage-python-how-to-use-queue-storage?tabs=python%2Cenvironment-variable-windows>
> 
> This would allow you to gather results as they are ready instead of waiting until the end.
> 
> Cedric
> 
> On Sun, Oct 9, 2022 at 10:27 PM Nikhil Makan <ni...@gmail.com> wrote:
>> That's it! Thanks David.
>> 
>> to_table().column(0).num_chunks = to_table().num_rows, therefore combine_chunks() merged them all into one.
>> 
>> However I need to unpack the finer details of this a bit more:
>>  * Given I have over 100k simulations producing these 'single row' files. What's the best way to handle this afterwards or is there a better way to store this from the start. The simulations are all running in parallel so writing to the same file is challenging and I don't want to go down the route of implementing a database of some sort. Would storing them as uncompressed arrow files improve performance when reading in with the dataset API or is there another more efficient way to combine them?
>>  * What is optimal for chunks? Naturally a chunk for each row is not efficient. Leaning on some knowledge I have with Spark the idea is to partition your data so it can be spread across the number of nodes in the cluster, to few partitions meant an under utilised cluster. What should be done with chunks and do we have control over setting this?
>> Kind regards
>> Nikhil Makan
>> 
>> 
>> On Mon, Oct 10, 2022 at 2:21 PM David Li <li...@apache.org> wrote:
>>> __
>>> I would guess that because the 187mb file is generated from 100,000+ files, and each input file is one row, you are hitting basically a pathological case for Dataset/Arrow. Namely, the Dataset code isn't going to consolidate input batches together, and so each input batch (= 1 row) is making it into the output file. And there's some metadata per batch (=per row!), inflating the storage requirements, and completely inhibiting compression. (You're running LZ4 on each individual value in each row!)
>>> 
>>> To check this, can you call combine_chunks() [1] after to_table, before write_feather? This will combine the record batches (well, technically, chunks of the ChunkedArrays) into a single record batch, and I would guess you'll end up with something similar to the to_pandas().to_feather() case (without bouncing through Pandas). 
>>> 
>>> You could also check this with to_table().column(0).num_chunks (I'd expect this would be equal to to_table().num_rows).
>>> 
>>> [1]: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.combine_chunks
>>> 
>>> On Sun, Oct 9, 2022, at 21:12, Nikhil Makan wrote:
>>>> Hi Will,
>>>> 
>>>> For clarity the simulation files do get written to an Azure Blob Storage, however to simplify things I have not tried to read the data directly from the cloud storage. I have downloaded it first and then loaded it into a dataset locally (which takes 12 mins). The process to produce the arrow file for each simulation is done through pandas. The raw results of each simulation gets read in using Pandas and written to an arrow file after a simulation is complete. The 100 000+ files are therefore in the arrow format using LZ4 compression.
>>>> 
>>>> The table retrieved from the dataset object is then written to an arrow file using feather.write_feather which again by default uses LZ4.
>>>> 
>>>> Do you know if there is any way to inspect the two files or tables to get more information about them as I can't understand how I have two arrow files, one which is 187 mb the other 5.5mb however both with the same compression LZ4, schema, shape and nbytes when read in.
>>>> 
>>>> I can even read in the 187mb arrow file and write it back to disk and it remains 187 mb, so there is definitely some property of the arrow table that I am not seeing. It is not necessarily a property of just the file.
>>>> 
>>>> Kind regards
>>>> Nikhil Makan
>>>> 
>>>> 
>>>> On Mon, Oct 10, 2022 at 1:38 PM Will Jones <wi...@gmail.com> wrote:
>>>>> Hi Nikhil,
>>>>> 
>>>>>> To do this I have chosen to use the Dataset API which reads all the files (this takes around 12 mins)
>>>>> 
>>>>> Given the number of files (100k+ right?) this does seem surprising. Especially if you are using a remote filesystem like Azure or S3. Perhaps you should consider having your application record the file paths of each simulation result; then the Datasets API doesn't need to spend time resolving all the file paths.
>>>>> 
>>>>>> The part I find strange is the size of the combined arrow file (187 mb) on disk and time it takes to write and read that file (> 1min).
>>>>> 
>>>>> On the size, you should check which compression you are using. There are some code paths that write uncompressed data by default and some code paths that do. The Pandas to_feather() uses LZ4 by default; it's possible the other way you are writing isn't. See IPC write options [1].
>>>>> 
>>>>> On the time to read, that seems very long for local, and even for remote (Azure?).
>>>>> 
>>>>> Best,
>>>>> 
>>>>> Will Jones
>>>>> 
>>>>> [1] https://arrow.apache.org/docs/python/generated/pyarrow.ipc.IpcWriteOptions.html#pyarrow.ipc.IpcWriteOptions
>>>>>  
>>>>> 
>>>>> On Sun, Oct 9, 2022 at 5:08 PM Nikhil Makan <ni...@gmail.com> wrote:
>>>>>> Hi All,
>>>>>> 
>>>>>> I have a situation where I am running a lot of simulations in parallel (100k+). The results of each simulation get written to an arrow file. A result file contains around 8 columns and 1 row.
>>>>>> 
>>>>>> After all simulations have run I want to be able to merge all these files up into a single arrow file.
>>>>>>  * To do this I have chosen to use the Dataset API which reads all the files (this takes around 12 mins)
>>>>>>  * I then call to_table() on the dataset to bring it into memory
>>>>>>  * Finally I write the table to an arrow file using feather.write_feather()
>>>>>> Unfortunately I do not have a reproducible example, but hopefully the answer to this question won't require one.
>>>>>> 
>>>>>> The part I find strange is the size of the combined arrow file (187 mb) on disk and time it takes to write and read that file (> 1min).
>>>>>> 
>>>>>> Alternatively I can convert the table to a pandas dataframe by calling to_pandas() and then use to_feather() on the dataframe. The resulting file on disk is now only 5.5 mb and naturally writes and opens in a flash.
>>>>>> 
>>>>>> I feel like this has something to do with partitions and how the table is being structured coming from the dataset API that is then preserved when writing to a file.
>>>>>> 
>>>>>> Does anyone know why this would be the case and how to achieve the same outcome as done with the intermediate step by converting to pandas.
>>>>>> 
>>>>>> Kind regards
>>>>>> Nikhil Makan
>>>>>> 
>>> 
> 
> 
> -- 
> Cedric Yau
> cedricyau@gmail.com

Re: [Python] - Dataset- Write Table to Feather

Posted by Cedric Yau <ce...@gmail.com>.
If each file is just one row, then you might be better off writing the
values out as a Tab Separate Values file.  Those can all be concatenated
together.

The data is also small enough to transmit via a queue.  I'm primarily AWS
but Azure Queue Storage appears to be the right service:
How to use Azure Queue Storage from Python | Microsoft Learn
<https://learn.microsoft.com/en-us/azure/storage/queues/storage-python-how-to-use-queue-storage?tabs=python%2Cenvironment-variable-windows>

This would allow you to gather results as they are ready instead of waiting
until the end.

Cedric

On Sun, Oct 9, 2022 at 10:27 PM Nikhil Makan <ni...@gmail.com>
wrote:

> That's it! Thanks David.
>
> to_table().column(0).num_chunks = to_table().num_rows, therefore
> combine_chunks() merged them all into one.
>
> However I need to unpack the finer details of this a bit more:
>
>    - Given I have over 100k simulations producing these 'single row'
>    files. What's the best way to handle this afterwards or is there a better
>    way to store this from the start. The simulations are all running in
>    parallel so writing to the same file is challenging and I don't want to go
>    down the route of implementing a database of some sort. Would storing them
>    as uncompressed arrow files improve performance when reading in with the
>    dataset API or is there another more efficient way to combine them?
>    - What is optimal for chunks? Naturally a chunk for each row is not
>    efficient. Leaning on some knowledge I have with Spark the idea is to
>    partition your data so it can be spread across the number of nodes in the
>    cluster, to few partitions meant an under utilised cluster. What should be
>    done with chunks and do we have control over setting this?
>
> Kind regards
> Nikhil Makan
>
>
> On Mon, Oct 10, 2022 at 2:21 PM David Li <li...@apache.org> wrote:
>
>> I would guess that because the 187mb file is generated from 100,000+
>> files, and each input file is one row, you are hitting basically a
>> pathological case for Dataset/Arrow. Namely, the Dataset code isn't going
>> to consolidate input batches together, and so each input batch (= 1 row) is
>> making it into the output file. And there's some metadata per batch (=per
>> row!), inflating the storage requirements, and completely inhibiting
>> compression. (You're running LZ4 on each individual value in each row!)
>>
>> To check this, can you call combine_chunks() [1] after to_table, before
>> write_feather? This will combine the record batches (well, technically,
>> chunks of the ChunkedArrays) into a single record batch, and I would guess
>> you'll end up with something similar to the to_pandas().to_feather() case
>> (without bouncing through Pandas).
>>
>> You could also check this with to_table().column(0).num_chunks (I'd
>> expect this would be equal to to_table().num_rows).
>>
>> [1]:
>> https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.combine_chunks
>>
>> On Sun, Oct 9, 2022, at 21:12, Nikhil Makan wrote:
>>
>> Hi Will,
>>
>> For clarity the simulation files do get written to an Azure Blob Storage,
>> however to simplify things I have not tried to read the data directly from
>> the cloud storage. I have downloaded it first and then loaded it into a
>> dataset locally (which takes 12 mins). The process to produce the arrow
>> file for each simulation is done through pandas. The raw results of each
>> simulation gets read in using Pandas and written to an arrow file after a
>> simulation is complete. The 100 000+ files are therefore in the arrow
>> format using LZ4 compression.
>>
>> The table retrieved from the dataset object is then written to an arrow
>> file using feather.write_feather which again by default uses LZ4.
>>
>> Do you know if there is any way to inspect the two files or tables to get
>> more information about them as I can't understand how I have two arrow
>> files, one which is 187 mb the other 5.5mb however both with the same
>> compression LZ4, schema, shape and nbytes when read in.
>>
>> I can even read in the 187mb arrow file and write it back to disk and it
>> remains 187 mb, so there is definitely some property of the arrow table
>> that I am not seeing. It is not necessarily a property of just the file.
>>
>> Kind regards
>> Nikhil Makan
>>
>>
>> On Mon, Oct 10, 2022 at 1:38 PM Will Jones <wi...@gmail.com>
>> wrote:
>>
>> Hi Nikhil,
>>
>> To do this I have chosen to use the Dataset API which reads all the files
>> (this takes around 12 mins)
>>
>>
>> Given the number of files (100k+ right?) this does seem surprising.
>> Especially if you are using a remote filesystem like Azure or S3. Perhaps
>> you should consider having your application record the file paths of each
>> simulation result; then the Datasets API doesn't need to spend time
>> resolving all the file paths.
>>
>> The part I find strange is the size of the combined arrow file (187 mb)
>> on disk and time it takes to write and read that file (> 1min).
>>
>>
>> On the size, you should check which compression you are using. There are
>> some code paths that write uncompressed data by default and some code paths
>> that do. The Pandas to_feather() uses LZ4 by default; it's possible the
>> other way you are writing isn't. See IPC write options [1].
>>
>> On the time to read, that seems very long for local, and even for remote
>> (Azure?).
>>
>> Best,
>>
>> Will Jones
>>
>> [1]
>> https://arrow.apache.org/docs/python/generated/pyarrow.ipc.IpcWriteOptions.html#pyarrow.ipc.IpcWriteOptions
>>
>>
>> On Sun, Oct 9, 2022 at 5:08 PM Nikhil Makan <ni...@gmail.com>
>> wrote:
>>
>> Hi All,
>>
>> I have a situation where I am running a lot of simulations in parallel
>> (100k+). The results of each simulation get written to an arrow file. A
>> result file contains around 8 columns and 1 row.
>>
>> After all simulations have run I want to be able to merge all these files
>> up into a single arrow file.
>>
>>    - To do this I have chosen to use the Dataset API which reads all the
>>    files (this takes around 12 mins)
>>    - I then call to_table() on the dataset to bring it into memory
>>    - Finally I write the table to an arrow file using
>>    feather.write_feather()
>>
>> Unfortunately I do not have a reproducible example, but hopefully the
>> answer to this question won't require one.
>>
>> The part I find strange is the size of the combined arrow file (187 mb)
>> on disk and time it takes to write and read that file (> 1min).
>>
>> Alternatively I can convert the table to a pandas dataframe by calling
>> to_pandas() and then use to_feather() on the dataframe. The resulting file
>> on disk is now only 5.5 mb and naturally writes and opens in a flash.
>>
>> I feel like this has something to do with partitions and how the table is
>> being structured coming from the dataset API that is then preserved when
>> writing to a file.
>>
>> Does anyone know why this would be the case and how to achieve the same
>> outcome as done with the intermediate step by converting to pandas.
>>
>> Kind regards
>> Nikhil Makan
>>
>>
>>

-- 
Cedric Yau
cedricyau@gmail.com

Re: [Python] - Dataset- Write Table to Feather

Posted by Nikhil Makan <ni...@gmail.com>.
That's it! Thanks David.

to_table().column(0).num_chunks = to_table().num_rows, therefore
combine_chunks() merged them all into one.

However I need to unpack the finer details of this a bit more:

   - Given I have over 100k simulations producing these 'single row' files.
   What's the best way to handle this afterwards or is there a better way to
   store this from the start. The simulations are all running in parallel so
   writing to the same file is challenging and I don't want to go down the
   route of implementing a database of some sort. Would storing them as
   uncompressed arrow files improve performance when reading in with the
   dataset API or is there another more efficient way to combine them?
   - What is optimal for chunks? Naturally a chunk for each row is not
   efficient. Leaning on some knowledge I have with Spark the idea is to
   partition your data so it can be spread across the number of nodes in the
   cluster, to few partitions meant an under utilised cluster. What should be
   done with chunks and do we have control over setting this?

Kind regards
Nikhil Makan


On Mon, Oct 10, 2022 at 2:21 PM David Li <li...@apache.org> wrote:

> I would guess that because the 187mb file is generated from 100,000+
> files, and each input file is one row, you are hitting basically a
> pathological case for Dataset/Arrow. Namely, the Dataset code isn't going
> to consolidate input batches together, and so each input batch (= 1 row) is
> making it into the output file. And there's some metadata per batch (=per
> row!), inflating the storage requirements, and completely inhibiting
> compression. (You're running LZ4 on each individual value in each row!)
>
> To check this, can you call combine_chunks() [1] after to_table, before
> write_feather? This will combine the record batches (well, technically,
> chunks of the ChunkedArrays) into a single record batch, and I would guess
> you'll end up with something similar to the to_pandas().to_feather() case
> (without bouncing through Pandas).
>
> You could also check this with to_table().column(0).num_chunks (I'd expect
> this would be equal to to_table().num_rows).
>
> [1]:
> https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.combine_chunks
>
> On Sun, Oct 9, 2022, at 21:12, Nikhil Makan wrote:
>
> Hi Will,
>
> For clarity the simulation files do get written to an Azure Blob Storage,
> however to simplify things I have not tried to read the data directly from
> the cloud storage. I have downloaded it first and then loaded it into a
> dataset locally (which takes 12 mins). The process to produce the arrow
> file for each simulation is done through pandas. The raw results of each
> simulation gets read in using Pandas and written to an arrow file after a
> simulation is complete. The 100 000+ files are therefore in the arrow
> format using LZ4 compression.
>
> The table retrieved from the dataset object is then written to an arrow
> file using feather.write_feather which again by default uses LZ4.
>
> Do you know if there is any way to inspect the two files or tables to get
> more information about them as I can't understand how I have two arrow
> files, one which is 187 mb the other 5.5mb however both with the same
> compression LZ4, schema, shape and nbytes when read in.
>
> I can even read in the 187mb arrow file and write it back to disk and it
> remains 187 mb, so there is definitely some property of the arrow table
> that I am not seeing. It is not necessarily a property of just the file.
>
> Kind regards
> Nikhil Makan
>
>
> On Mon, Oct 10, 2022 at 1:38 PM Will Jones <wi...@gmail.com>
> wrote:
>
> Hi Nikhil,
>
> To do this I have chosen to use the Dataset API which reads all the files
> (this takes around 12 mins)
>
>
> Given the number of files (100k+ right?) this does seem surprising.
> Especially if you are using a remote filesystem like Azure or S3. Perhaps
> you should consider having your application record the file paths of each
> simulation result; then the Datasets API doesn't need to spend time
> resolving all the file paths.
>
> The part I find strange is the size of the combined arrow file (187 mb) on
> disk and time it takes to write and read that file (> 1min).
>
>
> On the size, you should check which compression you are using. There are
> some code paths that write uncompressed data by default and some code paths
> that do. The Pandas to_feather() uses LZ4 by default; it's possible the
> other way you are writing isn't. See IPC write options [1].
>
> On the time to read, that seems very long for local, and even for remote
> (Azure?).
>
> Best,
>
> Will Jones
>
> [1]
> https://arrow.apache.org/docs/python/generated/pyarrow.ipc.IpcWriteOptions.html#pyarrow.ipc.IpcWriteOptions
>
>
> On Sun, Oct 9, 2022 at 5:08 PM Nikhil Makan <ni...@gmail.com>
> wrote:
>
> Hi All,
>
> I have a situation where I am running a lot of simulations in parallel
> (100k+). The results of each simulation get written to an arrow file. A
> result file contains around 8 columns and 1 row.
>
> After all simulations have run I want to be able to merge all these files
> up into a single arrow file.
>
>    - To do this I have chosen to use the Dataset API which reads all the
>    files (this takes around 12 mins)
>    - I then call to_table() on the dataset to bring it into memory
>    - Finally I write the table to an arrow file using
>    feather.write_feather()
>
> Unfortunately I do not have a reproducible example, but hopefully the
> answer to this question won't require one.
>
> The part I find strange is the size of the combined arrow file (187 mb) on
> disk and time it takes to write and read that file (> 1min).
>
> Alternatively I can convert the table to a pandas dataframe by calling
> to_pandas() and then use to_feather() on the dataframe. The resulting file
> on disk is now only 5.5 mb and naturally writes and opens in a flash.
>
> I feel like this has something to do with partitions and how the table is
> being structured coming from the dataset API that is then preserved when
> writing to a file.
>
> Does anyone know why this would be the case and how to achieve the same
> outcome as done with the intermediate step by converting to pandas.
>
> Kind regards
> Nikhil Makan
>
>
>

Re: [Python] - Dataset- Write Table to Feather

Posted by David Li <li...@apache.org>.
I would guess that because the 187mb file is generated from 100,000+ files, and each input file is one row, you are hitting basically a pathological case for Dataset/Arrow. Namely, the Dataset code isn't going to consolidate input batches together, and so each input batch (= 1 row) is making it into the output file. And there's some metadata per batch (=per row!), inflating the storage requirements, and completely inhibiting compression. (You're running LZ4 on each individual value in each row!)

To check this, can you call combine_chunks() [1] after to_table, before write_feather? This will combine the record batches (well, technically, chunks of the ChunkedArrays) into a single record batch, and I would guess you'll end up with something similar to the to_pandas().to_feather() case (without bouncing through Pandas). 

You could also check this with to_table().column(0).num_chunks (I'd expect this would be equal to to_table().num_rows).

[1]: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.combine_chunks

On Sun, Oct 9, 2022, at 21:12, Nikhil Makan wrote:
> Hi Will,
> 
> For clarity the simulation files do get written to an Azure Blob Storage, however to simplify things I have not tried to read the data directly from the cloud storage. I have downloaded it first and then loaded it into a dataset locally (which takes 12 mins). The process to produce the arrow file for each simulation is done through pandas. The raw results of each simulation gets read in using Pandas and written to an arrow file after a simulation is complete. The 100 000+ files are therefore in the arrow format using LZ4 compression.
> 
> The table retrieved from the dataset object is then written to an arrow file using feather.write_feather which again by default uses LZ4.
> 
> Do you know if there is any way to inspect the two files or tables to get more information about them as I can't understand how I have two arrow files, one which is 187 mb the other 5.5mb however both with the same compression LZ4, schema, shape and nbytes when read in.
> 
> I can even read in the 187mb arrow file and write it back to disk and it remains 187 mb, so there is definitely some property of the arrow table that I am not seeing. It is not necessarily a property of just the file.
> 
> Kind regards
> Nikhil Makan
> 
> 
> On Mon, Oct 10, 2022 at 1:38 PM Will Jones <wi...@gmail.com> wrote:
>> Hi Nikhil,
>> 
>>> To do this I have chosen to use the Dataset API which reads all the files (this takes around 12 mins)
>> 
>> Given the number of files (100k+ right?) this does seem surprising. Especially if you are using a remote filesystem like Azure or S3. Perhaps you should consider having your application record the file paths of each simulation result; then the Datasets API doesn't need to spend time resolving all the file paths.
>> 
>>> The part I find strange is the size of the combined arrow file (187 mb) on disk and time it takes to write and read that file (> 1min).
>> 
>> On the size, you should check which compression you are using. There are some code paths that write uncompressed data by default and some code paths that do. The Pandas to_feather() uses LZ4 by default; it's possible the other way you are writing isn't. See IPC write options [1].
>> 
>> On the time to read, that seems very long for local, and even for remote (Azure?).
>> 
>> Best,
>> 
>> Will Jones
>> 
>> [1] https://arrow.apache.org/docs/python/generated/pyarrow.ipc.IpcWriteOptions.html#pyarrow.ipc.IpcWriteOptions
>>  
>> 
>> On Sun, Oct 9, 2022 at 5:08 PM Nikhil Makan <ni...@gmail.com> wrote:
>>> Hi All,
>>> 
>>> I have a situation where I am running a lot of simulations in parallel (100k+). The results of each simulation get written to an arrow file. A result file contains around 8 columns and 1 row.
>>> 
>>> After all simulations have run I want to be able to merge all these files up into a single arrow file.
>>>  * To do this I have chosen to use the Dataset API which reads all the files (this takes around 12 mins)
>>>  * I then call to_table() on the dataset to bring it into memory
>>>  * Finally I write the table to an arrow file using feather.write_feather()
>>> Unfortunately I do not have a reproducible example, but hopefully the answer to this question won't require one.
>>> 
>>> The part I find strange is the size of the combined arrow file (187 mb) on disk and time it takes to write and read that file (> 1min).
>>> 
>>> Alternatively I can convert the table to a pandas dataframe by calling to_pandas() and then use to_feather() on the dataframe. The resulting file on disk is now only 5.5 mb and naturally writes and opens in a flash.
>>> 
>>> I feel like this has something to do with partitions and how the table is being structured coming from the dataset API that is then preserved when writing to a file.
>>> 
>>> Does anyone know why this would be the case and how to achieve the same outcome as done with the intermediate step by converting to pandas.
>>> 
>>> Kind regards
>>> Nikhil Makan
>>> 

Re: [Python] - Dataset- Write Table to Feather

Posted by Nikhil Makan <ni...@gmail.com>.
Hi Will,

For clarity the simulation files do get written to an Azure Blob Storage,
however to simplify things I have not tried to read the data directly from
the cloud storage. I have downloaded it first and then loaded it into a
dataset locally (which takes 12 mins). The process to produce the arrow
file for each simulation is done through pandas. The raw results of each
simulation gets read in using Pandas and written to an arrow file after a
simulation is complete. The 100 000+ files are therefore in the arrow
format using LZ4 compression.

The table retrieved from the dataset object is then written to an arrow
file using feather.write_feather which again by default uses LZ4.

Do you know if there is any way to inspect the two files or tables to get
more information about them as I can't understand how I have two arrow
files, one which is 187 mb the other 5.5mb however both with the same
compression LZ4, schema, shape and nbytes when read in.

I can even read in the 187mb arrow file and write it back to disk and it
remains 187 mb, so there is definitely some property of the arrow table
that I am not seeing. It is not necessarily a property of just the file.

Kind regards
Nikhil Makan


On Mon, Oct 10, 2022 at 1:38 PM Will Jones <wi...@gmail.com> wrote:

> Hi Nikhil,
>
> To do this I have chosen to use the Dataset API which reads all the files
>> (this takes around 12 mins)
>>
>
> Given the number of files (100k+ right?) this does seem surprising.
> Especially if you are using a remote filesystem like Azure or S3. Perhaps
> you should consider having your application record the file paths of each
> simulation result; then the Datasets API doesn't need to spend time
> resolving all the file paths.
>
> The part I find strange is the size of the combined arrow file (187 mb) on
>> disk and time it takes to write and read that file (> 1min).
>>
>
> On the size, you should check which compression you are using. There are
> some code paths that write uncompressed data by default and some code paths
> that do. The Pandas to_feather() uses LZ4 by default; it's possible the
> other way you are writing isn't. See IPC write options [1].
>
> On the time to read, that seems very long for local, and even for remote
> (Azure?).
>
> Best,
>
> Will Jones
>
> [1]
> https://arrow.apache.org/docs/python/generated/pyarrow.ipc.IpcWriteOptions.html#pyarrow.ipc.IpcWriteOptions
>
>
> On Sun, Oct 9, 2022 at 5:08 PM Nikhil Makan <ni...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I have a situation where I am running a lot of simulations in parallel
>> (100k+). The results of each simulation get written to an arrow file. A
>> result file contains around 8 columns and 1 row.
>>
>> After all simulations have run I want to be able to merge all these files
>> up into a single arrow file.
>>
>>    - To do this I have chosen to use the Dataset API which reads all the
>>    files (this takes around 12 mins)
>>    - I then call to_table() on the dataset to bring it into memory
>>    - Finally I write the table to an arrow file using
>>    feather.write_feather()
>>
>> Unfortunately I do not have a reproducible example, but hopefully the
>> answer to this question won't require one.
>>
>> The part I find strange is the size of the combined arrow file (187 mb)
>> on disk and time it takes to write and read that file (> 1min).
>>
>> Alternatively I can convert the table to a pandas dataframe by calling
>> to_pandas() and then use to_feather() on the dataframe. The resulting file
>> on disk is now only 5.5 mb and naturally writes and opens in a flash.
>>
>> I feel like this has something to do with partitions and how the table is
>> being structured coming from the dataset API that is then preserved when
>> writing to a file.
>>
>> Does anyone know why this would be the case and how to achieve the same
>> outcome as done with the intermediate step by converting to pandas.
>>
>> Kind regards
>> Nikhil Makan
>>
>>

Re: [Python] - Dataset- Write Table to Feather

Posted by Will Jones <wi...@gmail.com>.
Hi Nikhil,

To do this I have chosen to use the Dataset API which reads all the files
> (this takes around 12 mins)
>

Given the number of files (100k+ right?) this does seem surprising.
Especially if you are using a remote filesystem like Azure or S3. Perhaps
you should consider having your application record the file paths of each
simulation result; then the Datasets API doesn't need to spend time
resolving all the file paths.

The part I find strange is the size of the combined arrow file (187 mb) on
> disk and time it takes to write and read that file (> 1min).
>

On the size, you should check which compression you are using. There are
some code paths that write uncompressed data by default and some code paths
that do. The Pandas to_feather() uses LZ4 by default; it's possible the
other way you are writing isn't. See IPC write options [1].

On the time to read, that seems very long for local, and even for remote
(Azure?).

Best,

Will Jones

[1]
https://arrow.apache.org/docs/python/generated/pyarrow.ipc.IpcWriteOptions.html#pyarrow.ipc.IpcWriteOptions


On Sun, Oct 9, 2022 at 5:08 PM Nikhil Makan <ni...@gmail.com> wrote:

> Hi All,
>
> I have a situation where I am running a lot of simulations in parallel
> (100k+). The results of each simulation get written to an arrow file. A
> result file contains around 8 columns and 1 row.
>
> After all simulations have run I want to be able to merge all these files
> up into a single arrow file.
>
>    - To do this I have chosen to use the Dataset API which reads all the
>    files (this takes around 12 mins)
>    - I then call to_table() on the dataset to bring it into memory
>    - Finally I write the table to an arrow file using
>    feather.write_feather()
>
> Unfortunately I do not have a reproducible example, but hopefully the
> answer to this question won't require one.
>
> The part I find strange is the size of the combined arrow file (187 mb) on
> disk and time it takes to write and read that file (> 1min).
>
> Alternatively I can convert the table to a pandas dataframe by calling
> to_pandas() and then use to_feather() on the dataframe. The resulting file
> on disk is now only 5.5 mb and naturally writes and opens in a flash.
>
> I feel like this has something to do with partitions and how the table is
> being structured coming from the dataset API that is then preserved when
> writing to a file.
>
> Does anyone know why this would be the case and how to achieve the same
> outcome as done with the intermediate step by converting to pandas.
>
> Kind regards
> Nikhil Makan
>
>