You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2023/01/19 14:46:46 UTC

[arrow-datafusion-python] branch master updated: Improve README and add examples (#137)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git


The following commit(s) were added to refs/heads/master by this push:
     new e05f217  Improve README and add examples (#137)
e05f217 is described below

commit e05f217442d37d124e34d576c27901fb0c7a4e2a
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Jan 19 07:46:40 2023 -0700

    Improve README and add examples (#137)
---
 README.md                      | 141 ++++++++++++-----------------------------
 examples/README.md             |  27 ++++++++
 examples/chart.png             | Bin 0 -> 19089 bytes
 examples/dataframe-parquet.py  |  25 ++++++++
 examples/python-udaf.py        |  74 +++++++++++++++++++++
 examples/python-udf.py         |  42 ++++++++++++
 examples/query-pyarrow-data.py |  43 +++++++++++++
 examples/sql-parquet.py        |  27 ++++++++
 examples/sql-to-pandas.py      |  49 ++++++++++++++
 src/functions.rs               |  29 ++++++++-
 10 files changed, 355 insertions(+), 102 deletions(-)

diff --git a/README.md b/README.md
index 3cd144b..17e872a 100644
--- a/README.md
+++ b/README.md
@@ -24,132 +24,71 @@
 
 This is a Python library that binds to [Apache Arrow](https://arrow.apache.org/) in-memory query engine [DataFusion](https://github.com/apache/arrow-datafusion).
 
-Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV files, run it in a multi-threaded environment, and obtain the result back in Python.
+Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV
+files, run it in a multi-threaded environment, and obtain the result back in Python.
 
 It also allows you to use UDFs and UDAFs for complex operations.
 
-The major advantage of this library over other execution engines is that this library achieves zero-copy between Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart from having to lock the GIL when running those operations.
+The major advantage of this library over other execution engines is that this library achieves zero-copy between
+Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart
+from having to lock the GIL when running those operations.
 
-Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions about thread safety and lack of memory leaks.
+Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions
+about thread safety and lack of memory leaks.
 
 Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html).
 
-## How to use it
+## Example Usage
 
-Simple usage:
+The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results
+in a Pandas DataFrame, and then plotting a chart.
 
-```python
-import datafusion
-from datafusion import col
-import pyarrow
-
-# create a context
-ctx = datafusion.SessionContext()
-
-# create a RecordBatch and a new DataFrame from it
-batch = pyarrow.RecordBatch.from_arrays(
-    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
-    names=["a", "b"],
-)
-df = ctx.create_dataframe([[batch]])
-
-# create a new statement
-df = df.select(
-    col("a") + col("b"),
-    col("a") - col("b"),
-)
-
-# execute and collect the first (and only) batch
-result = df.collect()[0]
-
-assert result.column(0) == pyarrow.array([5, 7, 9])
-assert result.column(1) == pyarrow.array([-3, -3, -3])
-```
-
-### UDFs
-
-```python
-import pyarrow
-from datafusion import udf
-
-def is_null(array: pyarrow.Array) -> pyarrow.Array:
-    return array.is_null()
-
-is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable')
-
-# create a context
-ctx = datafusion.SessionContext()
-
-# create a RecordBatch and a new DataFrame from it
-batch = pyarrow.RecordBatch.from_arrays(
-    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
-    names=["a", "b"],
-)
-df = ctx.create_dataframe([[batch]])
-
-df = df.select(is_null_arr(col("a")))
+The Parquet file used in this example can be downloaded from the following page:
 
-result = df.collect()[0]
+- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
 
-assert result.column(0) == pyarrow.array([False] * 3)
-```
-
-### UDAF
+See the [examples](examples) directory for more examples.
 
 ```python
-import pyarrow
-import pyarrow.compute
-import datafusion
-from datafusion import udaf, Accumulator
-from datafusion import col
-
-
-class MyAccumulator(Accumulator):
-    """
-    Interface of a user-defined accumulation.
-    """
-    def __init__(self):
-        self._sum = pyarrow.scalar(0.0)
-
-    def update(self, values: pyarrow.Array) -> None:
-        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
-        self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())
+from datafusion import SessionContext
+import pandas as pd
+import pyarrow as pa
 
-    def merge(self, states: pyarrow.Array) -> None:
-        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
-        self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py())
+# Create a DataFusion context
+ctx = SessionContext()
 
-    def state(self) -> pyarrow.Array:
-        return pyarrow.array([self._sum.as_py()])
+# Register table with context
+ctx.register_parquet('taxi', 'yellow_tripdata_2021-01.parquet')
 
-    def evaluate(self) -> pyarrow.Scalar:
-        return self._sum
+# Execute SQL
+df = ctx.sql("select passenger_count, count(*) "
+             "from taxi "
+             "where passenger_count is not null "
+             "group by passenger_count "
+             "order by passenger_count")
 
-# create a context
-ctx = datafusion.SessionContext()
+# collect as list of pyarrow.RecordBatch
+results = df.collect()
 
-# create a RecordBatch and a new DataFrame from it
-batch = pyarrow.RecordBatch.from_arrays(
-    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
-    names=["a", "b"],
-)
-df = ctx.create_dataframe([[batch]])
+# get first batch
+batch = results[0]
 
-my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable')
+# convert to Pandas
+df = batch.to_pandas()
 
-df = df.aggregate(
-    [],
-    [my_udaf(col("a"))]
-)
+# create a chart
+fig = df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure()
+fig.savefig('chart.png')
+```
 
-result = df.collect()[0]
+This produces the following chart:
 
-assert result.column(0) == pyarrow.array([6.0])
-```
+![Chart](examples/chart.png)
 
 ## How to install (from pip)
 
 ### Pip
+
 ```bash
 pip install datafusion
 # or
@@ -157,6 +96,7 @@ python -m pip install datafusion
 ```
 
 ### Conda
+
 ```bash
 conda install -c conda-forge datafusion
 ```
@@ -169,7 +109,6 @@ You can verify the installation by running:
 '0.6.0'
 ```
 
-
 ## How to develop
 
 This assumes that you have rust and cargo installed. We use the workflow recommended by [pyo3](https://github.com/PyO3/pyo3) and [maturin](https://github.com/PyO3/maturin).
diff --git a/examples/README.md b/examples/README.md
new file mode 100644
index 0000000..a3ae0ba
--- /dev/null
+++ b/examples/README.md
@@ -0,0 +1,27 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# DataFusion Python Examples
+
+- [Query a Parquet file using SQL](./sql-parquet.py)
+- [Query a Parquet file using the DataFrame API](./dataframe-parquet.py)
+- [Run a SQL query and store the results in a Pandas DataFrame](./sql-to-pandas.py)
+- [Query PyArrow Data](./query-pyarrow-data.py)
+- [Register a Python UDF with DataFusion](./python-udf.py)
+- [Register a Python UDAF with DataFusion](./python-udaf.py)
diff --git a/examples/chart.png b/examples/chart.png
new file mode 100644
index 0000000..743583e
Binary files /dev/null and b/examples/chart.png differ
diff --git a/examples/dataframe-parquet.py b/examples/dataframe-parquet.py
new file mode 100644
index 0000000..31a8aa6
--- /dev/null
+++ b/examples/dataframe-parquet.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+from datafusion import functions as f
+
+ctx = SessionContext()
+df = ctx.read_parquet(
+    "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
+).aggregate([f.col("passenger_count")], [f.count_star()])
+df.show()
diff --git a/examples/python-udaf.py b/examples/python-udaf.py
new file mode 100644
index 0000000..ed705f5
--- /dev/null
+++ b/examples/python-udaf.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pyarrow
+import pyarrow.compute
+import datafusion
+from datafusion import udaf, Accumulator
+from datafusion import col
+
+
+class MyAccumulator(Accumulator):
+    """
+    Interface of a user-defined accumulation.
+    """
+
+    def __init__(self):
+        self._sum = pyarrow.scalar(0.0)
+
+    def update(self, values: pyarrow.Array) -> None:
+        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
+        self._sum = pyarrow.scalar(
+            self._sum.as_py() + pyarrow.compute.sum(values).as_py()
+        )
+
+    def merge(self, states: pyarrow.Array) -> None:
+        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
+        self._sum = pyarrow.scalar(
+            self._sum.as_py() + pyarrow.compute.sum(states).as_py()
+        )
+
+    def state(self) -> pyarrow.Array:
+        return pyarrow.array([self._sum.as_py()])
+
+    def evaluate(self) -> pyarrow.Scalar:
+        return self._sum
+
+
+# create a context
+ctx = datafusion.SessionContext()
+
+# create a RecordBatch and a new DataFrame from it
+batch = pyarrow.RecordBatch.from_arrays(
+    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
+    names=["a", "b"],
+)
+df = ctx.create_dataframe([[batch]])
+
+my_udaf = udaf(
+    MyAccumulator,
+    pyarrow.float64(),
+    pyarrow.float64(),
+    [pyarrow.float64()],
+    "stable",
+)
+
+df = df.aggregate([], [my_udaf(col("a"))])
+
+result = df.collect()[0]
+
+assert result.column(0) == pyarrow.array([6.0])
diff --git a/examples/python-udf.py b/examples/python-udf.py
new file mode 100644
index 0000000..30edd41
--- /dev/null
+++ b/examples/python-udf.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pyarrow
+from datafusion import udf, SessionContext, functions as f
+
+
+def is_null(array: pyarrow.Array) -> pyarrow.Array:
+    return array.is_null()
+
+
+is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), "stable")
+
+# create a context
+ctx = SessionContext()
+
+# create a RecordBatch and a new DataFrame from it
+batch = pyarrow.RecordBatch.from_arrays(
+    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
+    names=["a", "b"],
+)
+df = ctx.create_dataframe([[batch]])
+
+df = df.select(is_null_arr(f.col("a")))
+
+result = df.collect()[0]
+
+assert result.column(0) == pyarrow.array([False] * 3)
diff --git a/examples/query-pyarrow-data.py b/examples/query-pyarrow-data.py
new file mode 100644
index 0000000..83e6884
--- /dev/null
+++ b/examples/query-pyarrow-data.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datafusion
+from datafusion import col
+import pyarrow
+
+
+# create a context
+ctx = datafusion.SessionContext()
+
+# create a RecordBatch and a new DataFrame from it
+batch = pyarrow.RecordBatch.from_arrays(
+    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
+    names=["a", "b"],
+)
+df = ctx.create_dataframe([[batch]])
+
+# create a new statement
+df = df.select(
+    col("a") + col("b"),
+    col("a") - col("b"),
+)
+
+# execute and collect the first (and only) batch
+result = df.collect()[0]
+
+assert result.column(0) == pyarrow.array([5, 7, 9])
+assert result.column(1) == pyarrow.array([-3, -3, -3])
diff --git a/examples/sql-parquet.py b/examples/sql-parquet.py
new file mode 100644
index 0000000..7b2db6f
--- /dev/null
+++ b/examples/sql-parquet.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+
+ctx = SessionContext()
+ctx.register_parquet(
+    "taxi", "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
+)
+df = ctx.sql(
+    "select passenger_count, count(*) from taxi where passenger_count is not null group by passenger_count order by passenger_count"
+)
+df.show()
diff --git a/examples/sql-to-pandas.py b/examples/sql-to-pandas.py
new file mode 100644
index 0000000..3569e6d
--- /dev/null
+++ b/examples/sql-to-pandas.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+
+
+# Create a DataFusion context
+ctx = SessionContext()
+
+# Register table with context
+ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
+
+# Execute SQL
+df = ctx.sql(
+    "select passenger_count, count(*) "
+    "from taxi "
+    "where passenger_count is not null "
+    "group by passenger_count "
+    "order by passenger_count"
+)
+
+# collect as list of pyarrow.RecordBatch
+results = df.collect()
+
+# get first batch
+batch = results[0]
+
+# convert to Pandas
+df = batch.to_pandas()
+
+# create a chart
+fig = df.plot(
+    kind="bar", title="Trip Count by Number of Passengers"
+).get_figure()
+fig.savefig("chart.png")
diff --git a/src/functions.rs b/src/functions.rs
index d40ffbe..ac1077e 100644
--- a/src/functions.rs
+++ b/src/functions.rs
@@ -17,10 +17,11 @@
 
 use pyo3::{prelude::*, wrap_pyfunction};
 
+use datafusion_common::Column;
 use datafusion_expr::expr::AggregateFunction;
 use datafusion_expr::expr::{Sort, WindowFunction};
 use datafusion_expr::window_function::find_df_window_func;
-use datafusion_expr::{lit, BuiltinScalarFunction, WindowFrame};
+use datafusion_expr::{aggregate_function, lit, BuiltinScalarFunction, Expr, WindowFrame};
 
 use crate::errors::DataFusionError;
 use crate::expression::PyExpr;
@@ -81,6 +82,30 @@ fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
     })
 }
 
+/// Create a column reference expression
+#[pyfunction]
+fn col(name: &str) -> PyResult<PyExpr> {
+    Ok(PyExpr {
+        expr: datafusion_expr::Expr::Column(Column {
+            relation: None,
+            name: name.to_string(),
+        }),
+    })
+}
+
+/// Create a COUNT(1) aggregate expression
+#[pyfunction]
+fn count_star() -> PyResult<PyExpr> {
+    Ok(PyExpr {
+        expr: Expr::AggregateFunction(AggregateFunction {
+            fun: aggregate_function::AggregateFunction::Count,
+            args: vec![lit(1)],
+            distinct: false,
+            filter: None,
+        }),
+    })
+}
+
 /// Creates a new Window function expression
 #[pyfunction]
 fn window(
@@ -294,10 +319,12 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
     m.add_wrapped(wrap_pyfunction!(chr))?;
     m.add_wrapped(wrap_pyfunction!(char_length))?;
     m.add_wrapped(wrap_pyfunction!(coalesce))?;
+    m.add_wrapped(wrap_pyfunction!(col))?;
     m.add_wrapped(wrap_pyfunction!(concat_ws))?;
     m.add_wrapped(wrap_pyfunction!(concat))?;
     m.add_wrapped(wrap_pyfunction!(cos))?;
     m.add_wrapped(wrap_pyfunction!(count))?;
+    m.add_wrapped(wrap_pyfunction!(count_star))?;
     m.add_wrapped(wrap_pyfunction!(current_date))?;
     m.add_wrapped(wrap_pyfunction!(current_time))?;
     m.add_wrapped(wrap_pyfunction!(date_bin))?;