You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2023/01/19 14:46:46 UTC
[arrow-datafusion-python] branch master updated: Improve README and add examples (#137)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/master by this push:
new e05f217 Improve README and add examples (#137)
e05f217 is described below
commit e05f217442d37d124e34d576c27901fb0c7a4e2a
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Jan 19 07:46:40 2023 -0700
Improve README and add examples (#137)
---
README.md | 141 ++++++++++++-----------------------------
examples/README.md | 27 ++++++++
examples/chart.png | Bin 0 -> 19089 bytes
examples/dataframe-parquet.py | 25 ++++++++
examples/python-udaf.py | 74 +++++++++++++++++++++
examples/python-udf.py | 42 ++++++++++++
examples/query-pyarrow-data.py | 43 +++++++++++++
examples/sql-parquet.py | 27 ++++++++
examples/sql-to-pandas.py | 49 ++++++++++++++
src/functions.rs | 29 ++++++++-
10 files changed, 355 insertions(+), 102 deletions(-)
diff --git a/README.md b/README.md
index 3cd144b..17e872a 100644
--- a/README.md
+++ b/README.md
@@ -24,132 +24,71 @@
This is a Python library that binds to [Apache Arrow](https://arrow.apache.org/) in-memory query engine [DataFusion](https://github.com/apache/arrow-datafusion).
-Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV files, run it in a multi-threaded environment, and obtain the result back in Python.
+Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV
+files, run it in a multi-threaded environment, and obtain the result back in Python.
It also allows you to use UDFs and UDAFs for complex operations.
-The major advantage of this library over other execution engines is that this library achieves zero-copy between Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart from having to lock the GIL when running those operations.
+The major advantage of this library over other execution engines is that this library achieves zero-copy between
+Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart
+from having to lock the GIL when running those operations.
-Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions about thread safety and lack of memory leaks.
+Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions
+about thread safety and lack of memory leaks.
Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html).
-## How to use it
+## Example Usage
-Simple usage:
+The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results
+in a Pandas DataFrame, and then plotting a chart.
-```python
-import datafusion
-from datafusion import col
-import pyarrow
-
-# create a context
-ctx = datafusion.SessionContext()
-
-# create a RecordBatch and a new DataFrame from it
-batch = pyarrow.RecordBatch.from_arrays(
- [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
- names=["a", "b"],
-)
-df = ctx.create_dataframe([[batch]])
-
-# create a new statement
-df = df.select(
- col("a") + col("b"),
- col("a") - col("b"),
-)
-
-# execute and collect the first (and only) batch
-result = df.collect()[0]
-
-assert result.column(0) == pyarrow.array([5, 7, 9])
-assert result.column(1) == pyarrow.array([-3, -3, -3])
-```
-
-### UDFs
-
-```python
-import pyarrow
-from datafusion import udf
-
-def is_null(array: pyarrow.Array) -> pyarrow.Array:
- return array.is_null()
-
-is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable')
-
-# create a context
-ctx = datafusion.SessionContext()
-
-# create a RecordBatch and a new DataFrame from it
-batch = pyarrow.RecordBatch.from_arrays(
- [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
- names=["a", "b"],
-)
-df = ctx.create_dataframe([[batch]])
-
-df = df.select(is_null_arr(col("a")))
+The Parquet file used in this example can be downloaded from the following page:
-result = df.collect()[0]
+- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
-assert result.column(0) == pyarrow.array([False] * 3)
-```
-
-### UDAF
+See the [examples](examples) directory for more examples.
```python
-import pyarrow
-import pyarrow.compute
-import datafusion
-from datafusion import udaf, Accumulator
-from datafusion import col
-
-
-class MyAccumulator(Accumulator):
- """
- Interface of a user-defined accumulation.
- """
- def __init__(self):
- self._sum = pyarrow.scalar(0.0)
-
- def update(self, values: pyarrow.Array) -> None:
- # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
- self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())
+from datafusion import SessionContext
+import pandas as pd
+import pyarrow as pa
- def merge(self, states: pyarrow.Array) -> None:
- # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
- self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py())
+# Create a DataFusion context
+ctx = SessionContext()
- def state(self) -> pyarrow.Array:
- return pyarrow.array([self._sum.as_py()])
+# Register table with context
+ctx.register_parquet('taxi', 'yellow_tripdata_2021-01.parquet')
- def evaluate(self) -> pyarrow.Scalar:
- return self._sum
+# Execute SQL
+df = ctx.sql("select passenger_count, count(*) "
+ "from taxi "
+ "where passenger_count is not null "
+ "group by passenger_count "
+ "order by passenger_count")
-# create a context
-ctx = datafusion.SessionContext()
+# collect as list of pyarrow.RecordBatch
+results = df.collect()
-# create a RecordBatch and a new DataFrame from it
-batch = pyarrow.RecordBatch.from_arrays(
- [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
- names=["a", "b"],
-)
-df = ctx.create_dataframe([[batch]])
+# get first batch
+batch = results[0]
-my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable')
+# convert to Pandas
+df = batch.to_pandas()
-df = df.aggregate(
- [],
- [my_udaf(col("a"))]
-)
+# create a chart
+fig = df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure()
+fig.savefig('chart.png')
+```
-result = df.collect()[0]
+This produces the following chart:
-assert result.column(0) == pyarrow.array([6.0])
-```
+![Chart](examples/chart.png)
## How to install (from pip)
### Pip
+
```bash
pip install datafusion
# or
@@ -157,6 +96,7 @@ python -m pip install datafusion
```
### Conda
+
```bash
conda install -c conda-forge datafusion
```
@@ -169,7 +109,6 @@ You can verify the installation by running:
'0.6.0'
```
-
## How to develop
This assumes that you have rust and cargo installed. We use the workflow recommended by [pyo3](https://github.com/PyO3/pyo3) and [maturin](https://github.com/PyO3/maturin).
diff --git a/examples/README.md b/examples/README.md
new file mode 100644
index 0000000..a3ae0ba
--- /dev/null
+++ b/examples/README.md
@@ -0,0 +1,27 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# DataFusion Python Examples
+
+- [Query a Parquet file using SQL](./sql-parquet.py)
+- [Query a Parquet file using the DataFrame API](./dataframe-parquet.py)
+- [Run a SQL query and store the results in a Pandas DataFrame](./sql-to-pandas.py)
+- [Query PyArrow Data](./query-pyarrow-data.py)
+- [Register a Python UDF with DataFusion](./python-udf.py)
+- [Register a Python UDAF with DataFusion](./python-udaf.py)
diff --git a/examples/chart.png b/examples/chart.png
new file mode 100644
index 0000000..743583e
Binary files /dev/null and b/examples/chart.png differ
diff --git a/examples/dataframe-parquet.py b/examples/dataframe-parquet.py
new file mode 100644
index 0000000..31a8aa6
--- /dev/null
+++ b/examples/dataframe-parquet.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+from datafusion import functions as f
+
+ctx = SessionContext()
+df = ctx.read_parquet(
+ "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
+).aggregate([f.col("passenger_count")], [f.count_star()])
+df.show()
diff --git a/examples/python-udaf.py b/examples/python-udaf.py
new file mode 100644
index 0000000..ed705f5
--- /dev/null
+++ b/examples/python-udaf.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pyarrow
+import pyarrow.compute
+import datafusion
+from datafusion import udaf, Accumulator
+from datafusion import col
+
+
+class MyAccumulator(Accumulator):
+ """
+ Interface of a user-defined accumulation.
+ """
+
+ def __init__(self):
+ self._sum = pyarrow.scalar(0.0)
+
+ def update(self, values: pyarrow.Array) -> None:
+ # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
+ self._sum = pyarrow.scalar(
+ self._sum.as_py() + pyarrow.compute.sum(values).as_py()
+ )
+
+ def merge(self, states: pyarrow.Array) -> None:
+ # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
+ self._sum = pyarrow.scalar(
+ self._sum.as_py() + pyarrow.compute.sum(states).as_py()
+ )
+
+ def state(self) -> pyarrow.Array:
+ return pyarrow.array([self._sum.as_py()])
+
+ def evaluate(self) -> pyarrow.Scalar:
+ return self._sum
+
+
+# create a context
+ctx = datafusion.SessionContext()
+
+# create a RecordBatch and a new DataFrame from it
+batch = pyarrow.RecordBatch.from_arrays(
+ [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
+ names=["a", "b"],
+)
+df = ctx.create_dataframe([[batch]])
+
+my_udaf = udaf(
+ MyAccumulator,
+ pyarrow.float64(),
+ pyarrow.float64(),
+ [pyarrow.float64()],
+ "stable",
+)
+
+df = df.aggregate([], [my_udaf(col("a"))])
+
+result = df.collect()[0]
+
+assert result.column(0) == pyarrow.array([6.0])
diff --git a/examples/python-udf.py b/examples/python-udf.py
new file mode 100644
index 0000000..30edd41
--- /dev/null
+++ b/examples/python-udf.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pyarrow
+from datafusion import udf, SessionContext, functions as f
+
+
+def is_null(array: pyarrow.Array) -> pyarrow.Array:
+ return array.is_null()
+
+
+is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), "stable")
+
+# create a context
+ctx = SessionContext()
+
+# create a RecordBatch and a new DataFrame from it
+batch = pyarrow.RecordBatch.from_arrays(
+ [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
+ names=["a", "b"],
+)
+df = ctx.create_dataframe([[batch]])
+
+df = df.select(is_null_arr(f.col("a")))
+
+result = df.collect()[0]
+
+assert result.column(0) == pyarrow.array([False] * 3)
diff --git a/examples/query-pyarrow-data.py b/examples/query-pyarrow-data.py
new file mode 100644
index 0000000..83e6884
--- /dev/null
+++ b/examples/query-pyarrow-data.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datafusion
+from datafusion import col
+import pyarrow
+
+
+# create a context
+ctx = datafusion.SessionContext()
+
+# create a RecordBatch and a new DataFrame from it
+batch = pyarrow.RecordBatch.from_arrays(
+ [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
+ names=["a", "b"],
+)
+df = ctx.create_dataframe([[batch]])
+
+# create a new statement
+df = df.select(
+ col("a") + col("b"),
+ col("a") - col("b"),
+)
+
+# execute and collect the first (and only) batch
+result = df.collect()[0]
+
+assert result.column(0) == pyarrow.array([5, 7, 9])
+assert result.column(1) == pyarrow.array([-3, -3, -3])
diff --git a/examples/sql-parquet.py b/examples/sql-parquet.py
new file mode 100644
index 0000000..7b2db6f
--- /dev/null
+++ b/examples/sql-parquet.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+
+ctx = SessionContext()
+ctx.register_parquet(
+ "taxi", "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
+)
+df = ctx.sql(
+ "select passenger_count, count(*) from taxi where passenger_count is not null group by passenger_count order by passenger_count"
+)
+df.show()
diff --git a/examples/sql-to-pandas.py b/examples/sql-to-pandas.py
new file mode 100644
index 0000000..3569e6d
--- /dev/null
+++ b/examples/sql-to-pandas.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+
+
+# Create a DataFusion context
+ctx = SessionContext()
+
+# Register table with context
+ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
+
+# Execute SQL
+df = ctx.sql(
+ "select passenger_count, count(*) "
+ "from taxi "
+ "where passenger_count is not null "
+ "group by passenger_count "
+ "order by passenger_count"
+)
+
+# collect as list of pyarrow.RecordBatch
+results = df.collect()
+
+# get first batch
+batch = results[0]
+
+# convert to Pandas
+df = batch.to_pandas()
+
+# create a chart
+fig = df.plot(
+ kind="bar", title="Trip Count by Number of Passengers"
+).get_figure()
+fig.savefig("chart.png")
diff --git a/src/functions.rs b/src/functions.rs
index d40ffbe..ac1077e 100644
--- a/src/functions.rs
+++ b/src/functions.rs
@@ -17,10 +17,11 @@
use pyo3::{prelude::*, wrap_pyfunction};
+use datafusion_common::Column;
use datafusion_expr::expr::AggregateFunction;
use datafusion_expr::expr::{Sort, WindowFunction};
use datafusion_expr::window_function::find_df_window_func;
-use datafusion_expr::{lit, BuiltinScalarFunction, WindowFrame};
+use datafusion_expr::{aggregate_function, lit, BuiltinScalarFunction, Expr, WindowFrame};
use crate::errors::DataFusionError;
use crate::expression::PyExpr;
@@ -81,6 +82,30 @@ fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
})
}
+/// Create a column reference expression
+#[pyfunction]
+fn col(name: &str) -> PyResult<PyExpr> {
+ Ok(PyExpr {
+ expr: datafusion_expr::Expr::Column(Column {
+ relation: None,
+ name: name.to_string(),
+ }),
+ })
+}
+
+/// Create a COUNT(1) aggregate expression
+#[pyfunction]
+fn count_star() -> PyResult<PyExpr> {
+ Ok(PyExpr {
+ expr: Expr::AggregateFunction(AggregateFunction {
+ fun: aggregate_function::AggregateFunction::Count,
+ args: vec![lit(1)],
+ distinct: false,
+ filter: None,
+ }),
+ })
+}
+
/// Creates a new Window function expression
#[pyfunction]
fn window(
@@ -294,10 +319,12 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(chr))?;
m.add_wrapped(wrap_pyfunction!(char_length))?;
m.add_wrapped(wrap_pyfunction!(coalesce))?;
+ m.add_wrapped(wrap_pyfunction!(col))?;
m.add_wrapped(wrap_pyfunction!(concat_ws))?;
m.add_wrapped(wrap_pyfunction!(concat))?;
m.add_wrapped(wrap_pyfunction!(cos))?;
m.add_wrapped(wrap_pyfunction!(count))?;
+ m.add_wrapped(wrap_pyfunction!(count_star))?;
m.add_wrapped(wrap_pyfunction!(current_date))?;
m.add_wrapped(wrap_pyfunction!(current_time))?;
m.add_wrapped(wrap_pyfunction!(date_bin))?;