You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "svilupp (via GitHub)" <gi...@apache.org> on 2023/03/09 10:42:37 UTC

[GitHub] [arrow-julia] svilupp opened a new issue, #393: Benchmark of Arrow.jl vs Pyarrow (/Polars)

svilupp opened a new issue, #393:
URL: https://github.com/apache/arrow-julia/issues/393

   First of all, thank you for this amazing package!
   
   Recently, I've been loading a lot of large files and it felt like Arrow.jl loading times are greater than Python. I wanted to quantify this feeling, so I hacked up a rough benchmark (code + results below).
   
   **Observations**
   - Polars is amazing, especially when combined with PyArrow (`use_pyarrow=True` which improves the benchmarks quite a lot!)
   - With compression enabled, Arrow.jl becomes the slowest. 
   - On uncompressed data, Arrow.jl is ahead only of Pandas, because it creates a copy in memory (not mmaped). With strings, Polars can be >30times faster than Arrow.jl (which, unsurprisingly, to the number of threads on the machine)
   - Based on profiling, the biggest slowdowns for compressed files are 1) no threading for "common" workflows (Arrow.jl benchmarks below utilized only a single thread despite >30 available) and 2) wasteful resizing of buffer when decoding (we know the exact size of the buffer needed as per IPC specs! In some cases it was as much time as the decompression itself)
   - For large string vectors, we pay a huge penalty to the GC (on my private data, I see 50-70% of time spent in GC)
   - Just with a few tweaks, Arrow.jl becomes competitive with other libraries (see benchmark with 32 partitions)
   
   **Proposals**
   - Improve documentation to be very explicit that we need to _partition data_ to use threading (both for reading and writing). There is reference, but it took many hours and understanding the code base for that to click, so the first-time users will not get it
   - Enable threaded and initialized decompression ([similar to what we do we compressors](https://github.com/apache/arrow-julia/blob/63d2c9d3ca4539a0ea831ae8ecafa71b051d475d/src/Arrow.jl#L74), by pre-initializing one compressor for each thread). A the moment, each decompressor is [costly re-initialized from the type for each buffer](https://github.com/apache/arrow-julia/blob/63d2c9d3ca4539a0ea831ae8ecafa71b051d475d/src/table.jl#L505)
   - PR to TranscodingStreams to enable mutating decompression (we can initialize the correctly-sized output buffer thanks to IPC specs)
   - Implement InlineStrings for large string vectors (ala CSV.jl, I see 2 potential ways with different levels of hackiness), there is already an issue: #304 (It could become an extension to lighten the TTFX)
   - Change design to allow threading over columns (not multiple RecordBatches, which rarely exist). We would need to separate "parsing" from materializing -- clearly that's what PyArrow is doing
   - ?squash bugs - there were several segfaults and I think I know where it's coming from. I'll create a separate issue
   
   
   **Benchmarking results**
   
   Machine: 
   - m5dn.8xl, NVME local drive, 32 vCPU, 96GB RAM
   - Julia 1.8.5 / Arrow 2.4.3
   - Python 3.10
   
   **Task 1:** 10x count nonmissing elements in the first column of a table
   Data: 2 columns of 5K-long strings each, 10% of data missing, 10K rows
   Timings: (ordered by Uncompressed, LZ4, ZSTD)
   - Pandas: 1.2s, 1.5s, 1.6s
   - Polars: 5ms, 1.5s, 2.05s
   - Polars+PyArrow: 4.8ms, 0.26s, 0.42s
   - Arrow+32Threads: 0.17s, 2.3s, 1.6s
   - Arrow+1Thread: 0.2, 2.25s, 1.9s
   
   Data: 32 partitions (!), 2 columns of 5K-long strings each, 10% of data missing, 10K rows
   Timings: (ordered by Uncompressed, LZ4, ZSTD)
   - Pandas: 1.2s, 1.0s, 1.2s
   - Polars: 9ms, 2.1s, 2.8s
   - Polars+PyArrow: 1.1s, 1.3s, 1.5s
   - Arrow+32Threads: 0.22s, 0.44s, 0.4s
   (Arrow.jl timing also benefits from a quick fix to TranscodingStreams)
   
   **Task 2:** 10x mean value of Int column in the first column of a table
   Data: 10 columns, Int64, 10M rows
   Timings: (ordered by Uncompressed, LZ4, ZSTD)
   - Pandas: 5.4s, 5.9s, 5.84s
   - Polars: 0.23s, 8s, 8.1s
   - Polars+PyArrow: 0.2s, 0.7s, 0.6s
   - Arrow+32Threads: 0.1s, 17.2s, 6.1s
   - Arrow+1Thread: 0.1s, 16.3s, 6.3s
   
   Data: 32 partitions (!), 10 columns, Int64, 10M rows
   Timings: (ordered by Uncompressed, LZ4, ZSTD)
   - Pandas: 5.6, 2.8s, 2.6s
   - Polars: 0.23s, 12.8s, 12.6s
   - Polars+PyArrow: 6.5s, 6.5s, 6.4s
   - Arrow+32Threads: 0.1s, 1.2s, 0.7s
   (Arrow.jl timing also benefits from a quick fix to TranscodingStreams)
   
   
   **Benchmark details** 
   
   benchmark.jl
   ```
   # Test case 1: 32 Threads available
   fn = "data_raw/df_10K_string5K_missing"
   @time read_test(fn * "_unc.arrow", 10)
   # 0.164394 seconds (92.02 k allocations: 438.760 MiB, 16.59% gc time)
   @time read_test(fn * "_lz4.arrow", 10)
   # 2.289358 seconds (98.04 k allocations: 2.107 GiB, 18.83% gc time, 0.36% compilation time)
   @time read_test(fn * "_zstd.arrow", 10)
   # 1.581160 seconds (92.60 k allocations: 1.680 GiB, 3.89% gc time)
   
   # Test case 1: 1 Thread available
   fn = "data_raw/df_10K_string5K_missing"
   @time read_test(fn * "_unc.arrow", 10)
   #  0.199866 seconds (91.94 k allocations: 438.758 MiB)
   @time read_test(fn * "_lz4.arrow", 10)
   # 2.250688 seconds (92.92 k allocations: 2.106 GiB, 18.67% gc time)
   @time read_test(fn * "_zstd.arrow", 10)
   # 1.908512 seconds (92.62 k allocations: 1.680 GiB, 20.47% gc time)
   
   # Test case 1: 32 Threads available, arrow file has 32 partitions (+ minor tweak to transcoding function)
   fn = "data_raw/df_10K_string5K_missing"
   @time read_test(fn * "_unc.arrow", 10; colname=:x1)
   # 0.224833 seconds (129.90 k allocations: 442.646 MiB)
   @time read_test(fn * "_lz4.arrow", 10; colname=:x1)
   # 0.435552 seconds (148.28 k allocations: 1.276 GiB)
   @time read_test(fn * "_zstd.arrow", 10; colname=:x1)
   # 0.401670 seconds (141.81 k allocations: 1.276 GiB, 13.32% gc time
   
   
   # Test case 2: 32 Threads available
   fn = "data_raw/df_10M_col10"
   @time read_test_mean(fn * "_unc.arrow", 10; colname=:x1)
   #  0.107945 seconds (6.34 k allocations: 347.328 KiB)
   @time read_test_mean(fn * "_lz4.arrow", 10; colname=:x1)
   # 17.237241 seconds (29.70 k allocations: 14.903 GiB, 10.65% gc time, 0.10% compilation time)
   @time read_test_mean(fn * "_zstd.arrow", 10; colname=:x1)
   # 6.157455 seconds (7.65 k allocations: 14.902 GiB, 10.48% gc time)
   
   # Test case 2: 1 Thread available
   fn = "data_raw/df_10M_col10"
   @time read_test_mean(fn * "_unc.arrow", 10; colname=:x1)
   # 0.101603 seconds (6.28 k allocations: 345.750 KiB)
   @time read_test_mean(fn * "_lz4.arrow", 10; colname=:x1)
   # 16.322216 seconds (28.00 k allocations: 14.903 GiB, 6.25% gc time, 0.10% compilation time)
   @time read_test_mean(fn * "_zstd.arrow", 10; colname=:x1)
   # 6.311729 seconds (7.58 k allocations: 14.902 GiB, 13.17% gc time)
   
   # Test case 2: 32 Threads available, arrow file has 32 partitions (+ minor tweak to transcoding function)
   fn = "data_raw/df_10M_col10"
   @time read_test_mean(fn * "_unc.arrow", 10; colname=:x1)
   # 0.118847 seconds (161.34 k allocations: 8.437 MiB)
   @time read_test_mean(fn * "_lz4.arrow", 10; colname=:x1)
   # 1.156759 seconds (200.97 k allocations: 7.460 GiB)
   @time read_test_mean(fn * "_zstd.arrow", 10; colname=:x1)
   # 0.655502 seconds (191.44 k allocations: 7.460 GiB)
   ```
   
   
   benchmark.py (all 32 threads active, 1 partition/RecordBarch in the arrow file)
   ```
   # accidentally overwritten... It's in the summaries at the top, but I can re-run it if interesting.
   ```
   
   
   benchmark_partitioned.py (all 32 threads active, 32 partitions/RecordBatches in arrow files)
   ```
   ### Test case 1: Strings (all 32 threads, 32 partitions)
   fn1="df_10K_string5K_missing_unc.arrow"
   fn2="df_10K_string5K_missing_lz4.arrow"
   fn3="df_10K_string5K_missing_zstd.arrow"
   #
   %time read_test_pandas(fn1,10)
   %time read_test_pandas(fn2,10)
   %time read_test_pandas(fn3,10)
   #
   %time read_test_polars(fn1,10)
   %time read_test_polars(fn2,10)
   %time read_test_polars(fn3,10)
   #
   %time read_test_polars_pyarrow(fn1,10)
   %time read_test_polars_pyarrow(fn2,10)
   %time read_test_polars_pyarrow(fn3,10)
   
   #
   CPU times: user 349 ms, sys: 898 ms, total: 1.25 s
   Wall time: 1.24 s
   CPU times: user 519 ms, sys: 649 ms, total: 1.17 s
   Wall time: 1.03 s
   CPU times: user 1.11 s, sys: 534 ms, total: 1.64 s
   Wall time: 1.23 s
   #
   CPU times: user 6.06 ms, sys: 13.9 ms, total: 20 ms
   Wall time: 8.99 ms
   CPU times: user 682 ms, sys: 1.42 s, total: 2.11 s
   Wall time: 2.09 s
   CPU times: user 1.65 s, sys: 1.19 s, total: 2.85 s
   Wall time: 2.84 s
   #
   CPU times: user 551 ms, sys: 744 ms, total: 1.29 s
   Wall time: 1.14 s
   CPU times: user 764 ms, sys: 833 ms, total: 1.6 s
   Wall time: 1.27 s
   CPU times: user 1.37 s, sys: 791 ms, total: 2.16 s
   Wall time: 1.54 s
   
   ### Test case 2: Integers (all 32 threads, 32 partitions)
   fn1="df_10M_col10_unc.arrow"
   fn2="df_10M_col10_lz4.arrow"
   fn3="df_10M_col10_zstd.arrow"
   #
   %time read_test_mean_pandas(fn1,10)
   %time read_test_mean_pandas(fn2,10)
   %time read_test_mean_pandas(fn3,10)
   #
   %time read_test_mean_polars(fn1,10)
   %time read_test_mean_polars(fn2,10)
   %time read_test_mean_polars(fn3,10)
   #
   %time read_test_mean_polars_pyarrow(fn1,10)
   %time read_test_mean_polars_pyarrow(fn2,10)
   %time read_test_mean_polars_pyarrow(fn3,10)
   
   #
   CPU times: user 2.51 s, sys: 8.85 s, total: 11.4 s
   Wall time: 5.62 s
   CPU times: user 4.1 s, sys: 7.29 s, total: 11.4 s
   Wall time: 2.84 s
   CPU times: user 4.28 s, sys: 5.44 s, total: 9.72 s
   Wall time: 2.61 s
   #
   CPU times: user 159 ms, sys: 87.1 ms, total: 247 ms
   Wall time: 232 ms
   CPU times: user 4.23 s, sys: 8.66 s, total: 12.9 s
   Wall time: 12.8 s
   CPU times: user 4.27 s, sys: 8.33 s, total: 12.6 s
   Wall time: 12.6 s
   #
   CPU times: user 2.53 s, sys: 4.06 s, total: 6.59 s
   Wall time: 6.58 s
   CPU times: user 4.25 s, sys: 4.91 s, total: 9.15 s
   Wall time: 6.49 s
   CPU times: user 4.33 s, sys: 4.33 s, total: 8.66 s
   Wall time: 6.43 s
   ```
   
   
   benchmark_setup.py
   ```
   !pip install pandas pyarrow polars pathlib
   from pathlib import Path
   import pandas as pd
   import polars as pl
   import pyarrow
   from pyarrow.feather import write_feather,read_feather
   
   # String isnull tests
   def read_test_pandas(fn,n):
     counter=0
     for i in range(n):
       counter+=pd.read_feather(fn).x1.notna().sum()
     return counter
     
   def read_test_polars_pyarrow(fn,n):
     counter=0
     for i in range(n):
       counter+=pl.read_ipc(fn,use_pyarrow=True)["x1"].is_not_null().sum()
     return counter
   
   def read_test_polars(fn,n):
     counter=0
     for i in range(n):
       counter+=pl.read_ipc(fn)["x1"].is_not_null().sum()
     return counter
   
   # length test to make sure _is_not_null is not cheating and reading just metadata
   def read_test_len_polars_pyarrow(fn,n):
     counter=0
     for i in range(n):
       counter+=pl.read_ipc(fn,use_pyarrow=True)["x1"].str.lengths().sum()
     return counter
   
   # Integer tests
   def read_test_mean_pandas(fn,n):
     counter=0
     for i in range(n):
       counter+=pd.read_feather(fn).x1.mean()
     return counter
   
   def read_test_mean_polars(fn,n):
     counter=0
     for i in range(n):
       counter+=pl.read_ipc(fn)["x1"].mean()
     return counter
   
   def read_test_mean_polars_pyarrow(fn,n):
     counter=0
     for i in range(n):
       counter+=pl.read_ipc(fn,use_pyarrow=True)["x1"].mean()
     return counter
   ```
   
   
   benchmark_setup.jl
   ```
   using Arrow
   using DataFramesMeta
   using DataFramesMeta: Tables
   using BenchmarkTools
   using Random
   
   # utility functions for generation
   function generate_numeric(N, ::Type{T}=Int; cols=10) where {T<:Number}
       df = DataFrame(rand(T, N, cols), :auto)
   end
   function generate_string(N; len=10, allowmissing=true)
       df = DataFrame(x1=map(x -> randstring(len), 1:N), x2=map(x -> randstring(len), 1:N))
       allowmissing && allowmissing!(df)
       return df
   end
   function add_rand_missing!(df, p=0.1)
       for col in names(df)
           mask_missing = rand(nrow(df)) .< p
           df[mask_missing, col] .= missing
       end
       df
   end
   function write_out_compressions(df, fn_base)
       Arrow.write(fn_base * "_unc.arrow", df; compress=nothing)
       Arrow.write(fn_base * "_lz4.arrow", df; compress=:lz4)
       Arrow.write(fn_base * "_zstd.arrow", df; compress=:zstd)
       return nothing
   end
   
   # utility functions for reading
   function read_test(fn, n; colname=:x1)
       counter = 0
       for i in 1:n
           t = Arrow.Table(fn)
           counter += sum(.!ismissing.(t[colname]))
       end
       return counter
   end
   function read_test_mean(fn, n; colname=:x1)
       counter = 0
       for i in 1:n
           t = Arrow.Table(fn)
           counter += mean((t[colname]))
       end
       return counter
   end
   
   # Test case 1: two columns with wide strings and some missing data
   fn = "data_raw/df_10K_string5K_missing"
   N=10_000
   df = generate_string(N; len=5000) |> add_rand_missing!
   write_out_compressions(df, fn); 
   
   # Test case 2: 10 columns of 10M Integers
   fn = "data_raw/df_10M_col10"
   N = 10_000_000
   df = generate_numeric(N, Int)
   write_out_compressions(df, fn);
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-julia] ericphanson commented on issue #393: Benchmark of Arrow.jl vs Pyarrow (/Polars)

Posted by "ericphanson (via GitHub)" <gi...@apache.org>.
ericphanson commented on issue #393:
URL: https://github.com/apache/arrow-julia/issues/393#issuecomment-1464942972

   Oops, misclick


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-julia] svilupp commented on issue #393: Benchmark of Arrow.jl vs Pyarrow (/Polars)

Posted by "svilupp (via GitHub)" <gi...@apache.org>.
svilupp commented on issue #393:
URL: https://github.com/apache/arrow-julia/issues/393#issuecomment-1465241253

   TL;DR The world makes sense again! Arrow.jl is the fastest reader now (except for one case). It took leveraging threads, skipping unnecessary resizing of buffers, some initialization, and adding support for InlineStrings (stack-allocated strings). Details and the implementation for testing are in [here](https://github.com/apache/arrow-julia/pull/399)
   
   Here are some learnings for those of you seeking Arrow.jl performance:
   - **Partition your data** (biggest benefit!)- Arrow.jl cannot leverage multiple threads unless it's reading/writing data split in multiple "chunks" with the same types/columns (split your table `tbl` with `Iterators.partition(Tables.rows(tbl),chunksize)`, DataFrames.jl 1.5.0 now supports `Iterators.partition(df)` directly!). Associated [PR and additional resources](https://github.com/apache/arrow-julia/pull/400)
   - **Inline your strings** - Creating / accessing strings is expensive. If your data has a lot of short strings (length <255), you can leverage `InlineStrings.jl` library (read @quinnj's [introductory blog post](https://quinnj.hashnode.dev/inlinestringsjl-fun-with-primitive-types-and-llvm-in-julia). The simplest use is to call `inlinestrings()` on your vector strings, but that means you had to materialize normal strings before. Alternatively, you can avoid this materialization by including [the following file](https://github.com/svilupp/arrow-julia/blob/arrow-turbo/src/inlinestrings.jl) from the PR mentioned above and running method `_inlinestrings()` over your table columns, eg, for table `t`, I would run `Arrow.columns(t) .= _inlinestrings.(Arrow.columns(t))`. (I prefixed it by _ because it has weird semantics, so I didn't want to hijack the original `inlinestrings` -> we simply swap the String type of the data wrapper `Arrow.List` without materializing any strings)`
   
   The rest is probably not suitable for most users, as it involves changing the package internals:
   - **Initialize your codecs** - When using compression (LZ4 or ZSTD), we need to initialize the codecs first. Hence, for repeated compression/decompression, TranscodingStreams.jl, the wrapper package for all compression work, advises to [pre-initialize your codecs and re-use them](https://juliaio.github.io/TranscodingStreams.jl/latest/examples/#Transcode-lots-of-strings-1). This was done for [compressors](https://github.com/apache/arrow-julia/blob/9b36c8b1ec9efbdc63009d1b8cd72ee705fc1711/src/Arrow.jl#L80), but not for decompressors. You can do the same for decompressors (PR was opened)
   - **Don't resize buffers** when you don't need to -  During decompression, `TranscodingStreams.jl.transcode()` function keeps resizing your output buffer in fixed increments until it's done. In my benchmarks above, we spent as much time resizing the buffers, as decompressing the data. Fortunately, Arrow IPC file format specification requires the size of each column to be saved in the metadata, so we actually know the size of the output buffer we need! I've upstreamed the [PR here](https://github.com/JuliaIO/TranscodingStreams.jl/pull/134)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-julia] baumgold commented on issue #393: Benchmark of Arrow.jl vs Pyarrow (/Polars)

Posted by "baumgold (via GitHub)" <gi...@apache.org>.
baumgold commented on issue #393:
URL: https://github.com/apache/arrow-julia/issues/393#issuecomment-1465065885

   I’m certainly interested! Thanks for this hard work, @svilupp !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-julia] quinnj commented on issue #393: Benchmark of Arrow.jl vs Pyarrow (/Polars)

Posted by "quinnj (via GitHub)" <gi...@apache.org>.
quinnj commented on issue #393:
URL: https://github.com/apache/arrow-julia/issues/393#issuecomment-1464942431

   Wow! Thanks for the detailed research/investigation/writeup @svilupp! I think most of what you mentioned all sounds like things we should indeed do. I'm currently bogged down in a few other projects for the next month or two, but I'm hoping to then have some time to contribute more meaningfully to the package as the list of issues has grown faster than I've been able to keep up. I appreciate all the effort you've put in here and would more than welcome PRs to improve things. I usually have time to review things regardless of what else is going on. Looking forward to improving things!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-julia] ericphanson closed issue #393: Benchmark of Arrow.jl vs Pyarrow (/Polars)

Posted by "ericphanson (via GitHub)" <gi...@apache.org>.
ericphanson closed issue #393: Benchmark of Arrow.jl vs Pyarrow (/Polars)
URL: https://github.com/apache/arrow-julia/issues/393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-julia] svilupp commented on issue #393: Benchmark of Arrow.jl vs Pyarrow (/Polars)

Posted by "svilupp (via GitHub)" <gi...@apache.org>.
svilupp commented on issue #393:
URL: https://github.com/apache/arrow-julia/issues/393#issuecomment-1465018021

   I’ve already implemented most of the changes locally. I’ll post some benchmarks and learnings here tomorrow, and open the relevant PRs, if there is interest.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org