You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2021/12/03 18:53:11 UTC

[arrow-site] branch master updated: Cross-posted blog post with DuckDB (#168)

This is an automated email from the ASF dual-hosted git repository.

jonkeane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-site.git


The following commit(s) were added to refs/heads/master by this push:
     new 82364da  Cross-posted blog post with DuckDB (#168)
82364da is described below

commit 82364da1112734fc547763b9646d978e2c189d09
Author: Jonathan Keane <jk...@gmail.com>
AuthorDate: Fri Dec 3 10:53:06 2021 -0800

    Cross-posted blog post with DuckDB (#168)
    
    * Cross-posted blog post with DuckDB
    
    * Update _posts/2021-12-9-arrow-duckdb.md
    
    * Update _posts/2021-12-9-arrow-duckdb.md
    
    * fix the date
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    Co-authored-by: Neal Richardson <ne...@gmail.com>
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    Co-authored-by: Neal Richardson <ne...@gmail.com>
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    Co-authored-by: Neal Richardson <ne...@gmail.com>
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    Co-authored-by: David Li <li...@gmail.com>
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    Co-authored-by: David Li <li...@gmail.com>
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    * Update _posts/2021-12-3-arrow-duckdb.md
    
    Co-authored-by: Neal Richardson <ne...@gmail.com>
    Co-authored-by: David Li <li...@gmail.com>
---
 _posts/2021-12-3-arrow-duckdb.md | 387 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 387 insertions(+)

diff --git a/_posts/2021-12-3-arrow-duckdb.md b/_posts/2021-12-3-arrow-duckdb.md
new file mode 100644
index 0000000..baf4eab
--- /dev/null
+++ b/_posts/2021-12-3-arrow-duckdb.md
@@ -0,0 +1,387 @@
+---
+layout: post
+title: "DuckDB quacks Arrow: A zero-copy data integration between Arrow and DuckDB"
+date: "2021-12-3 00:00:00"
+author: Pedro Holanda, Jonathan Keane
+categories: [application]
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+*TLDR: The zero-copy integration between DuckDB and Apache Arrow allows for rapid analysis of larger than memory datasets in Python and R using either SQL or relational APIs.*
+
+This post is a collaboration with and cross-posted on [the DuckDB blog](https://duckdb.org/2021/12/03/duck-arrow.html).
+
+Part of [Apache Arrow](https://arrow.apache.org) is an in-memory data format optimized for analytical libraries. Like Pandas and R Dataframes, it uses a columnar data model. But the Arrow project contains more than just the format: The Arrow C++ library, which is accessible in Python, R, and Ruby via bindings, has additional features that allow you to compute efficiently on datasets. These additional features are on top of the implementation of the in-memory format described above. The d [...]
+
+[DuckDB](https://www.duckdb.org) is a new analytical data management system that is designed to run complex SQL queries within other processes. DuckDB has bindings for R and Python, among others. DuckDB can query Arrow datasets directly and stream query results back to Arrow. This integration allows users to query Arrow data using DuckDB's SQL Interface and API, while taking advantage of DuckDB's parallel vectorized execution engine, without requiring any extra data copying. Additionally [...]
+
+This integration is unique because it uses zero-copy streaming of data between DuckDB and Arrow and vice versa so that you can compose a query using both together. This results in three main benefits:
+
+1. **Larger Than Memory Analysis:** Since both libraries support streaming query results, we are capable of executing on data without fully loading it from disk. Instead, we can execute one batch at a time. This allows us to execute queries on data that is bigger than memory.
+2. **Complex Data Types:** DuckDB can efficiently process complex data types that can be stored in Arrow vectors, including arbitrarily nested structs, lists, and maps.
+3. **Advanced Optimizer:** DuckDB's state-of-the-art optimizer can push down filters and projections directly into Arrow scans. As a result, only relevant columns and partitions will be read, allowing the system to e.g., take advantage of partition elimination in Parquet files. This significantly accelerates query execution.
+
+For those that are just interested in benchmarks, you can jump ahead [benchmark section below](#Benchmark Comparison).
+
+## Quick Tour
+Before diving into the details of the integration, in this section we provide a quick motivating example of how powerful and simple to use is the DuckDB-Arrow integration. With a few lines of code, you can already start querying Arrow datasets. Say you want to analyze the infamous [NYC Taxi Dataset](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) and figure out if groups tip more or less than single riders.
+
+### R
+Both Arrow and DuckDB support dplyr pipelines for people more comfortable with using dplyr for their data analysis. The Arrow package includes two helper functions that allow us to pass data back and forth between Arrow and DuckDB (`to_duckdb()` and `to_arrow()`).
+This is especially useful in cases where something is supported in one of Arrow or DuckDB but not the other. For example, if you find a complex dplyr pipeline where the SQL translation doesn't work with DuckDB, use `to_arrow()` before the pipeline to use the Arrow engine. Or, if you have a function (e.g., windowed aggregates) that aren't yet implemented in Arrow, use `to_duckdb()` to use the DuckDB engine. All while not paying any cost to (re)serialize the data when you pass it back and forth!
+
+
+```R
+library(duckdb)
+library(arrow)
+library(dplyr)
+
+# Open dataset using year,month folder partition
+ds <- arrow::open_dataset("nyc-taxi", partitioning = c("year", "month"))
+
+ds %>%
+  # Look only at 2015 on, where the number of passenger is positive, the trip distance is
+  # greater than a quarter mile, and where the fare amount is positive
+  filter(year > 2014 & passenger_count > 0 & trip_distance > 0.25 & fare_amount > 0) %>%
+  # Pass off to DuckDB
+  to_duckdb() %>%
+  group_by(passenger_count) %>%
+  mutate(tip_pct = tip_amount / fare_amount) %>%
+  summarise(
+    fare_amount = mean(fare_amount, na.rm = TRUE),
+    tip_amount = mean(tip_amount, na.rm = TRUE),
+    tip_pct = mean(tip_pct, na.rm = TRUE)
+  ) %>%
+  arrange(passenger_count) %>%
+  collect()
+```
+
+### Python
+The workflow in Python is as simple as it is in R. In this example we use DuckDB's Relational API.
+
+``` python
+import duckdb
+import pyarrow as pa
+import pyarrow.dataset as ds
+
+# Open dataset using year,month folder partition
+nyc = ds.dataset('nyc-taxi/', partitioning=["year", "month"])
+
+# We transform the nyc dataset into a DuckDB relation
+nyc = duckdb.arrow(nyc)
+
+# Run same query again
+nyc.filter("year > 2014 & passenger_count > 0 & trip_distance > 0.25 & fare_amount > 0")
+    .aggregate("SELECT AVG(fare_amount), AVG(tip_amount), AVG(tip_amount / fare_amount) as tip_pct","passenger_count").arrow()
+```
+
+## DuckDB and Arrow: The Basics
+
+In this section, we will look at some basic examples of the code needed to read and output Arrow tables in both Python and R.
+
+#### Setup
+
+First we need to install DuckDB and Arrow. The installation process for both libraries in Python and R is shown below.
+```bash
+# Python Install
+pip install duckdb
+pip install pyarrow
+```
+
+``` R
+# R Install
+install.packages("duckdb")
+install.packages("arrow")
+```
+
+To execute the sample-examples in this section, we need to download the following custom parquet files:
+ - https://github.com/duckdb/duckdb-web/blob/master/_posts/data/integers.parquet?raw=true
+ - https://github.com/cwida/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet
+
+
+#### Python
+
+There are two ways in Python of querying data from Arrow:
+1. Through the Relational API
+```py
+# Reads Parquet File to an Arrow Table
+arrow_table = pq.read_table('integers.parquet')
+
+# Transforms Arrow Table -> DuckDB Relation
+rel_from_arrow = duckdb.arrow(arrow_table)
+
+# we can run a SQL query on this and print the result
+print(rel_from_arrow.query('arrow_table', 'SELECT SUM(data) FROM arrow_table WHERE data > 50').fetchone())
+
+# Transforms DuckDB Relation -> Arrow Table
+arrow_table_from_duckdb = rel_from_arrow.arrow()
+```
+
+2. By using replacement scans and querying the object directly with SQL:
+```py
+# Reads Parquet File to an Arrow Table
+arrow_table = pq.read_table('integers.parquet')
+
+# Gets Database Connection
+con = duckdb.connect()
+
+# we can run a SQL query on this and print the result
+print(con.execute('SELECT SUM(data) FROM arrow_table WHERE data > 50').fetchone())
+
+# Transforms Query Result from DuckDB to Arrow Table
+# We can directly read the arrow object through DuckDB's replacement scans.
+con.execute("SELECT * FROM arrow_table").fetch_arrow_table()
+```
+
+It is possible to transform both DuckDB Relations and Query Results back to Arrow.
+
+#### R
+
+In R, you can interact with Arrow data in DuckDB by registering the table as a view (an alternative is to use dplyr as shown above).
+```r
+library(duckdb)
+library(arrow)
+library(dplyr)
+
+# Reads Parquet File to an Arrow Table
+arrow_table <- arrow::read_parquet("integers.parquet", as_data_frame = FALSE)
+
+# Gets Database Connection
+con <- dbConnect(duckdb::duckdb())
+
+# Registers arrow table as a DuckDB view
+arrow::to_duckdb(arrow_table, table_name = "arrow_table", con = con)
+
+# we can run a SQL query on this and print the result
+print(dbGetQuery(con, "SELECT SUM(data) FROM arrow_table WHERE data > 50"))
+
+# Transforms Query Result from DuckDB to Arrow Table
+result <- dbSendQuery(con, "SELECT * FROM arrow_table")
+```
+
+### Streaming Data from/to Arrow
+In the previous section, we depicted how to interact with Arrow tables. However, Arrow also allows users to interact with the data in a streaming fashion. Either consuming it (e.g., from an Arrow Dataset) or producing it (e.g., returning a RecordBatchReader). And of course, DuckDB is able to consume Datasets and produce RecordBatchReaders. This example uses the NYC Taxi Dataset, stored in Parquet files partitioned by year and month, which we can download through the Arrow R package:
+```R
+arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi")
+```
+
+
+#### Python
+```py
+# Reads dataset partitioning it in year/month folder
+nyc_dataset = ds.dataset('nyc-taxi/', partitioning=["year", "month"])
+
+# Gets Database Connection
+con = duckdb.connect()
+
+query = con.execute("SELECT * FROM nyc_dataset")
+# DuckDB's queries can now produce a Record Batch Reader
+record_batch_reader = query.fetch_record_batch()
+# Which means we can stream the whole query per batch.
+# This retrieves the first batch
+chunk = record_batch_reader.read_next_batch()
+```
+#### R
+```r
+# Reads dataset partitioning it in year/month folder
+nyc_dataset = open_dataset("nyc-taxi/", partitioning = c("year", "month"))
+
+# Gets Database Connection
+con <- dbConnect(duckdb::duckdb())
+
+# We can use the same function as before to register our arrow dataset
+duckdb::duckdb_register_arrow(con, "nyc", nyc_dataset)
+
+res <- dbSendQuery(con, "SELECT * FROM nyc", arrow = TRUE)
+# DuckDB's queries can now produce a Record Batch Reader
+record_batch_reader <- duckdb::duckdb_fetch_record_batch(res)
+
+# Which means we can stream the whole query per batch.
+# This retrieves the first batch
+cur_batch <- record_batch_reader$read_next_batch()
+```
+
+The preceding R code shows in low-level detail how the data is streaming. We provide the helper `to_arrow()` in the Arrow package which is a wrapper around this that makes it easy to incorporate this streaming into a dplyr pipeline. [^1]
+
+## Benchmark Comparison
+
+Here we demonstrate in a simple benchmark the performance difference between querying Arrow datasets with DuckDB and querying Arrow datasets with Pandas.
+For both the Projection and Filter pushdown comparison, we will use Arrow tables. That is due to Pandas not being capable of consuming Arrow stream objects.
+
+For the NYC Taxi benchmarks, we used the [scilens diamonds configuration](https://www.monetdb.org/wiki/Scilens-configuration-standard) and for the TPC-H benchmarks, we used an m1 MacBook Pro. In both cases, parallelism in DuckDB was used (which is now on by default).
+
+For the comparison with Pandas, note that DuckDB runs in parallel, while pandas only support single-threaded execution. Besides that, one should note that we are comparing automatic optimizations. DuckDB's query optimizer can automatically push down filters and projections. This automatic optimization is not supported in pandas, but it is possible for users to manually perform some of these predicate and filter pushdowns by manually specifying them them in the `read_parquet()` call. 
+
+### Projection Pushdown
+
+In this example we run a simple aggregation on two columns of our lineitem table.
+
+``` python
+# DuckDB
+lineitem = pq.read_table('lineitemsf1.snappy.parquet')
+con = duckdb.connect()
+
+# Transforms Query Result from DuckDB to Arrow Table
+con.execute("""SELECT sum(l_extendedprice * l_discount) AS revenue
+                FROM
+                lineitem;""").fetch_arrow_table()
+
+```
+
+``` python
+# Pandas
+arrow_table = pq.read_table('lineitemsf1.snappy.parquet')
+
+# Converts an Arrow table to a Dataframe
+df = arrow_table.to_pandas()
+
+# Runs aggregation
+res =  pd.DataFrame({'sum': [(df.l_extendedprice * df.l_discount).sum()]})
+
+# Creates an Arrow Table from a Dataframe
+new_table = pa.Table.from_pandas(res)
+
+```
+
+|    Name     | Time (s) |
+|-------------|---------:|
+| DuckDB  | 0.19    |
+| Pandas      | 2.13    |
+
+The lineitem table is composed of 16 columns, however, to execute this query only two columns ```l_extendedprice``` and  *  ```l_discount``` are necessary. Since DuckDB can push down the projection of these columns, it is capable of executing this query about one order of magnitude faster than Pandas.
+
+### Filter Pushdown
+
+For our filter pushdown we repeat the same aggregation used in the previous section, but add filters on 4 more columns.
+
+``` python
+# DuckDB
+lineitem = pq.read_table('lineitemsf1.snappy.parquet')
+
+# Get database connection
+con = duckdb.connect()
+
+# Transforms Query Result from DuckDB to Arrow Table
+con.execute("""SELECT sum(l_extendedprice * l_discount) AS revenue
+        FROM
+            lineitem
+        WHERE
+            l_shipdate >= CAST('1994-01-01' AS date)
+            AND l_shipdate < CAST('1995-01-01' AS date)
+            AND l_discount BETWEEN 0.05
+            AND 0.07
+            AND l_quantity < 24; """).fetch_arrow_table()
+
+```
+
+``` python
+# Pandas
+arrow_table = pq.read_table('lineitemsf1.snappy.parquet')
+
+df = arrow_table.to_pandas()
+filtered_df = lineitem[
+        (lineitem.l_shipdate >= "1994-01-01") &
+        (lineitem.l_shipdate < "1995-01-01") &
+        (lineitem.l_discount >= 0.05) &
+        (lineitem.l_discount <= 0.07) &
+        (lineitem.l_quantity < 24)]
+
+res =  pd.DataFrame({'sum': [(filtered_df.l_extendedprice * filtered_df.l_discount).sum()]})
+new_table = pa.Table.from_pandas(res)
+```
+
+|    Name     | Time (s) |
+|-------------|----------|
+| DuckDB  | 0.04    |
+| Pandas      | 2.29    |
+
+The difference now between DuckDB and Pandas is more drastic, being two orders of magnitude faster than Pandas. Again, since both the filter and projection are pushed down to Arrow, DuckDB reads less data than Pandas, which can't automatically perform this optimization.
+
+### Streaming
+
+As demonstrated before, DuckDB is capable of consuming and producing Arrow data in a streaming fashion. In this section we run a simple benchmark, to showcase the benefits in speed and memory usage when comparing it to full materialization and Pandas. This example uses the full NYC taxi dataset which you can download
+
+
+``` python
+# DuckDB
+# Open dataset using year,month folder partition
+nyc = ds.dataset('nyc-taxi/', partitioning=["year", "month"])
+
+# Get database connection
+con = duckdb.connect()
+
+# Run query that selects part of the data
+query = con.execute("SELECT total_amount, passenger_count,year FROM nyc where total_amount > 100 and year > 2014")
+
+# Create Record Batch Reader from Query Result.
+# "fetch_record_batch()" also accepts an extra parameter related to the desired produced chunk size.
+record_batch_reader = query.fetch_record_batch()
+
+# Retrieve all batch chunks
+chunk = record_batch_reader.read_next_batch()
+while len(chunk) > 0:
+    chunk = record_batch_reader.read_next_batch()
+```
+
+``` python
+# Pandas
+# We must exclude one of the columns of the NYC dataset due to an unimplemented cast in Arrow.
+working_columns = ["vendor_id","pickup_at","dropoff_at","passenger_count","trip_distance","pickup_longitude",
+    "pickup_latitude","store_and_fwd_flag","dropoff_longitude","dropoff_latitude","payment_type",
+    "fare_amount","extra","mta_tax","tip_amount","tolls_amount","total_amount","year", "month"]
+
+# Open dataset using year,month folder partition
+nyc_dataset = ds.dataset(dir, partitioning=["year", "month"])
+# Generate a scanner to skip problematic column
+dataset_scanner = nyc_dataset.scanner(columns=working_columns)
+
+# Materialize dataset to an Arrow Table
+nyc_table = dataset_scanner.to_table()
+
+# Generate Dataframe from Arow Table
+nyc_df = nyc_table.to_pandas()
+
+# Apply Filter
+filtered_df = nyc_df[
+    (nyc_df.total_amount > 100) &
+    (nyc_df.year >2014)]
+
+# Apply Projection
+res = filtered_df[["total_amount", "passenger_count","year"]]
+
+# Transform Result back to an Arrow Table
+new_table = pa.Table.from_pandas(res)
+```
+
+|    Name     | Time (s) | Peak Memory Usage (GBs) |
+|-------------|----------|-------------------------|
+| DuckDB  | 0.05    | 0.3                       |
+| Pandas      | 146.91    | 248                  |
+
+The difference in times between DuckDB and Pandas is a combination of all the integration benefits we explored in this article. In DuckDB the filter pushdown is applied to perform partition elimination (i.e., we skip reading the Parquet files where the year is <= 2014). The filter pushdown is also used to eliminate unrelated row_groups (i.e., row groups where the total amount is always <= 100). Due to our projection pushdown, Arrow only has to read the columns of interest from the Parque [...]
+
+In the table above, we also depict the comparison of peak memory usage between DuckDB (Streaming) and Pandas (Fully-Materializing).  In DuckDB, we only need to load the row-group of interest into memory. Hence our memory usage is low. We also have constant memory usage since we only have to keep one of these row groups in-memory at a time. Pandas, on the other hand, has to fully materialize all Parquet files when executing the query. Because of this, we see a constant steep increase in i [...]
+
+## Conclusion and Feedback
+In this blog post, we mainly showcased how to execute queries on Arrow datasets with DuckDB. There are additional libraries that can also consume the Arrow format but they have different purposes and capabilities. As always, we are happy to hear if you want to see benchmarks with different tools for a post in the future! Feel free to drop us an [email](mailto:pedro@duckdblabs.com;jon@voltrondata.com) or share your thoughts directly in the Hacker News post.
+
+Last but not least, if you encounter any problems when using our integration, please open an issue in in either [DuckDB's - issue tracker](https://github.com/duckdb/duckdb/issues)  or [Arrow's - issue tracker](https://issues.apache.org/jira/projects/ARROW/), depending on which library has a problem.
+
+[^1]: In Arrow 6.0.0, `to_arrow()` currently returns the full table, but will allow full streaming in our upcoming 7.0.0 release.