You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2023/01/19 01:17:51 UTC
[arrow-datafusion-python] branch master updated: Upgrade to DataFusion 16.0.0 (#115)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/master by this push:
new 575367c Upgrade to DataFusion 16.0.0 (#115)
575367c is described below
commit 575367c965a5d1257846dccc3a0ef00a8fd5c5ce
Author: Andy Grove <an...@gmail.com>
AuthorDate: Wed Jan 18 18:17:45 2023 -0700
Upgrade to DataFusion 16.0.0 (#115)
---
.github/workflows/test.yaml | 2 +-
Cargo.lock | 327 ++++++---------------
Cargo.toml | 11 +-
datafusion/tests/test_config.py | 12 +-
datafusion/tests/test_dataframe.py | 3 +
.../tests/test_config.py => dev/python_lint.sh | 31 +-
src/catalog.rs | 14 +-
src/config.rs | 28 +-
src/context.rs | 80 ++---
src/dataframe.rs | 85 ++++--
src/dataset_exec.rs | 4 -
src/functions.rs | 33 ++-
src/udaf.rs | 14 +-
13 files changed, 264 insertions(+), 380 deletions(-)
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index a416fbd..327b1cf 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -96,7 +96,7 @@ jobs:
if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }}
run: |
source venv/bin/activate
- flake8 --exclude venv --ignore=E501
+ flake8 --exclude venv --ignore=E501,W503
black --line-length 79 --diff --check .
- name: Run tests
diff --git a/Cargo.lock b/Cargo.lock
index e4e14f9..0f3fbea 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -101,9 +101,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "arrow"
-version = "28.0.0"
+version = "29.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aed9849f86164fad5cb66ce4732782b15f1bc97f8febab04e782c20cce9d4b6c"
+checksum = "2fe17dc0113da7e2eaeaedbd304d347aa8ea64916d225b79a5c3f3b6b5d8da4c"
dependencies = [
"ahash",
"arrow-array",
@@ -113,8 +113,10 @@ dependencies = [
"arrow-data",
"arrow-ipc",
"arrow-json",
+ "arrow-ord",
"arrow-schema",
"arrow-select",
+ "arrow-string",
"bitflags",
"chrono",
"comfy-table",
@@ -124,14 +126,13 @@ dependencies = [
"num",
"pyo3",
"regex",
- "regex-syntax",
]
[[package]]
name = "arrow-array"
-version = "28.0.0"
+version = "29.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b8504cf0a6797e908eecf221a865e7d339892720587f87c8b90262863015b08"
+checksum = "b9452131e027aec3276e43449162af084db611c42ef875e54d231e6580bc6254"
dependencies = [
"ahash",
"arrow-buffer",
@@ -145,9 +146,9 @@ dependencies = [
[[package]]
name = "arrow-buffer"
-version = "28.0.0"
+version = "29.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d6de64a27cea684b24784647d9608314bc80f7c4d55acb44a425e05fab39d916"
+checksum = "4a301001e8ed7da638a12fa579ac5f3f154c44c0655f2ca6ed0f8586b418a779"
dependencies = [
"half",
"num",
@@ -155,9 +156,9 @@ dependencies = [
[[package]]
name = "arrow-cast"
-version = "28.0.0"
+version = "29.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bec4a54502eefe05923c385c90a005d69474fa06ca7aa2a2b123c9f9532f6178"
+checksum = "048c91d067f2eb8cc327f086773e5b0f0d7714780807fc4db09366584e23bac8"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -171,9 +172,9 @@ dependencies = [
[[package]]
name = "arrow-csv"
-version = "28.0.0"
+version = "29.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e7902bbf8127eac48554fe902775303377047ad49a9fd473c2b8cb399d092080"
+checksum = "ed914cd0006a3bb9cac8136b3098ac7796ad26b82362f00d4f2e7c1a54684b86"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -189,9 +190,9 @@ dependencies = [
[[package]]
name = "arrow-data"
-version = "28.0.0"
+version = "29.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7e4882efe617002449d5c6b5de9ddb632339074b36df8a96ea7147072f1faa8a"
+checksum = "e59619d9d102e4e6b22087b2bd60c07df76fcb68683620841718f6bc8e8f02cb"
dependencies = [
"arrow-buffer",
"arrow-schema",
@@ -201,9 +202,9 @@ dependencies = [
[[package]]
name = "arrow-ipc"
-version = "28.0.0"
+version = "29.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fa0703a6de2785828561b03a4d7793ecd333233e1b166316b4bfc7cfce55a4a7"
+checksum = "fb7ad6d2fa06a1cebdaa213c59fc953b9230e560d8374aba133b572b864ec55e"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -215,9 +216,9 @@ dependencies = [
[[package]]
name = "arrow-json"
-version = "28.0.0"
+version = "29.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3bd23fc8c6d251f96cd63b96fece56bbb9710ce5874a627cb786e2600673595a"
+checksum = "1e22efab3ad70336057660c5e5f2b72e2417e3444c27cb42dc477d678ddd6979"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -231,17 +232,31 @@ dependencies = [
"serde_json",
]
+[[package]]
+name = "arrow-ord"
+version = "29.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e23b623332804a65ad11e7732c351896dcb132c19f8e25d99fdb13b00aae5206"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "arrow-select",
+ "num",
+]
+
[[package]]
name = "arrow-schema"
-version = "28.0.0"
+version = "29.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "da9f143882a80be168538a60e298546314f50f11f2a288c8d73e11108da39d26"
+checksum = "69ef17c144f1253b9864f5a3e8f4c6f1e436bdd52394855d5942f132f776b64e"
[[package]]
name = "arrow-select"
-version = "28.0.0"
+version = "29.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "520406331d4ad60075359524947ebd804e479816439af82bcb17f8d280d9b38c"
+checksum = "e2accaf218ff107e3df0ee8f1e09b092249a1cc741c4377858a1470fd27d7096"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -250,6 +265,21 @@ dependencies = [
"num",
]
+[[package]]
+name = "arrow-string"
+version = "29.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4a0954f9e1f45b04815ddacbde72899bf3c03a08fa6c0375f42178c4a01a510"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "arrow-select",
+ "regex",
+ "regex-syntax",
+]
+
[[package]]
name = "async-compression"
version = "0.3.15"
@@ -433,43 +463,6 @@ dependencies = [
"winapi",
]
-[[package]]
-name = "clap"
-version = "4.0.29"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4d63b9e9c07271b9957ad22c173bae2a4d9a81127680962039296abcd2f8251d"
-dependencies = [
- "bitflags",
- "clap_derive",
- "clap_lex",
- "is-terminal",
- "once_cell",
- "strsim",
- "termcolor",
-]
-
-[[package]]
-name = "clap_derive"
-version = "4.0.21"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0177313f9f02afc995627906bbd8967e2be069f5261954222dac78290c2b9014"
-dependencies = [
- "heck",
- "proc-macro-error",
- "proc-macro2",
- "quote",
- "syn",
-]
-
-[[package]]
-name = "clap_lex"
-version = "0.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8"
-dependencies = [
- "os_str_bytes",
-]
-
[[package]]
name = "codespan-reporting"
version = "0.11.1"
@@ -640,9 +633,9 @@ dependencies = [
[[package]]
name = "datafusion"
-version = "15.0.0"
+version = "16.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b75a088adf79515b04fd3895c1a14dc249c8f7a7f27b59870a05546fe9a55542"
+checksum = "f166d67281ee90d0b35d93a9d13b3b32c26f35348da9e5101a12e9de6093bd06"
dependencies = [
"ahash",
"apache-avro",
@@ -663,6 +656,7 @@ dependencies = [
"futures",
"glob",
"hashbrown 0.13.1",
+ "indexmap",
"itertools",
"lazy_static",
"log",
@@ -677,7 +671,6 @@ dependencies = [
"pyo3",
"rand",
"smallvec",
- "sqllogictest",
"sqlparser",
"tempfile",
"tokio",
@@ -690,13 +683,14 @@ dependencies = [
[[package]]
name = "datafusion-common"
-version = "15.0.0"
+version = "16.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7b17262b899f79afdf502846d1138a8b48441afe24dc6e07c922105289248137"
+checksum = "5cb2144c73ca974b00bf735a6e8692efe22c65731097bc49018a0edfbd1d0120"
dependencies = [
"apache-avro",
"arrow",
"chrono",
+ "num_cpus",
"object_store",
"parquet",
"pyo3",
@@ -705,9 +699,9 @@ dependencies = [
[[package]]
name = "datafusion-expr"
-version = "15.0.0"
+version = "16.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "533d2226b4636a1306d1f6f4ac02e436947c5d6e8bfc85f6d8f91a425c10a407"
+checksum = "b6b988765372fdee77d805dda00cb7ffb28dfda831cc0b79aff9e09527b70402"
dependencies = [
"ahash",
"arrow",
@@ -718,9 +712,9 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
-version = "15.0.0"
+version = "16.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ce7ba274267b6baf1714a67727249aa56d648c8814b0f4c43387fbe6d147e619"
+checksum = "50b44aa5128e3e6f76d7ae8efc3a595133abd74aa60c3f50f39f93ea00ef302c"
dependencies = [
"arrow",
"async-trait",
@@ -730,13 +724,14 @@ dependencies = [
"datafusion-physical-expr",
"hashbrown 0.13.1",
"log",
+ "regex-syntax",
]
[[package]]
name = "datafusion-physical-expr"
-version = "15.0.0"
+version = "16.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f35cb53e6c2f9c40accdf45aef2be7fde030ea3051b1145a059d96109e65b0bf"
+checksum = "0fd7e12ce4d61eb698a83fde3bf14417fe67a23d7131b1f00c28dc70183aeed7"
dependencies = [
"ahash",
"arrow",
@@ -750,6 +745,7 @@ dependencies = [
"datafusion-row",
"half",
"hashbrown 0.13.1",
+ "indexmap",
"itertools",
"lazy_static",
"md-5",
@@ -777,15 +773,16 @@ dependencies = [
"parking_lot",
"pyo3",
"rand",
+ "regex-syntax",
"tokio",
"uuid 0.8.2",
]
[[package]]
name = "datafusion-row"
-version = "15.0.0"
+version = "16.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "27c77b1229ae5cf6a6e0e2ba43ed4e98131dbf1cc4a97fad17c94230b32e0812"
+checksum = "e34f6c09f1458190bb90305d70c2075bf2dd4cf8c51a65d5635e5217a3bb8bff"
dependencies = [
"arrow",
"datafusion-common",
@@ -795,22 +792,17 @@ dependencies = [
[[package]]
name = "datafusion-sql"
-version = "15.0.0"
+version = "16.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "569423fa8a50db39717080949e3b4f8763582b87baf393cc3fcf27cc21467ba7"
+checksum = "b42f29fd2c98e0e0030db4638f971c91145ef5e67ab139f0426b2891e14b9bf5"
dependencies = [
"arrow-schema",
"datafusion-common",
"datafusion-expr",
+ "log",
"sqlparser",
]
-[[package]]
-name = "difference"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
-
[[package]]
name = "digest"
version = "0.10.5"
@@ -843,27 +835,6 @@ dependencies = [
"cfg-if",
]
-[[package]]
-name = "errno"
-version = "0.2.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1"
-dependencies = [
- "errno-dragonfly",
- "libc",
- "winapi",
-]
-
-[[package]]
-name = "errno-dragonfly"
-version = "0.1.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
-dependencies = [
- "cc",
- "libc",
-]
-
[[package]]
name = "fastrand"
version = "1.8.0"
@@ -1083,15 +1054,6 @@ dependencies = [
"libc",
]
-[[package]]
-name = "hermit-abi"
-version = "0.2.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
-dependencies = [
- "libc",
-]
-
[[package]]
name = "http"
version = "0.2.8"
@@ -1126,12 +1088,6 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
-[[package]]
-name = "humantime"
-version = "2.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
-
[[package]]
name = "hyper"
version = "0.14.20"
@@ -1205,9 +1161,9 @@ dependencies = [
[[package]]
name = "indexmap"
-version = "1.9.1"
+version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
+checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
@@ -1234,34 +1190,12 @@ version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
-[[package]]
-name = "io-lifetimes"
-version = "1.0.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c"
-dependencies = [
- "libc",
- "windows-sys 0.42.0",
-]
-
[[package]]
name = "ipnet"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b"
-[[package]]
-name = "is-terminal"
-version = "0.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "927609f78c2913a6f6ac3c27a4fe87f43e2a35367c0c4b0f8265e8f49a104330"
-dependencies = [
- "hermit-abi 0.2.6",
- "io-lifetimes",
- "rustix",
- "windows-sys 0.42.0",
-]
-
[[package]]
name = "itertools"
version = "0.10.5"
@@ -1413,17 +1347,6 @@ dependencies = [
"libc",
]
-[[package]]
-name = "libtest-mimic"
-version = "0.6.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d7b603516767d1ab23d0de09d023e62966c3322f7148297c35cf3d97aa8b37fa"
-dependencies = [
- "clap",
- "termcolor",
- "threadpool",
-]
-
[[package]]
name = "link-cplusplus"
version = "1.0.7"
@@ -1433,12 +1356,6 @@ dependencies = [
"cc",
]
-[[package]]
-name = "linux-raw-sys"
-version = "0.1.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f"
-
[[package]]
name = "lock_api"
version = "0.4.9"
@@ -1652,7 +1569,7 @@ version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [
- "hermit-abi 0.1.19",
+ "hermit-abi",
"libc",
]
@@ -1699,12 +1616,6 @@ dependencies = [
"num-traits",
]
-[[package]]
-name = "os_str_bytes"
-version = "6.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee"
-
[[package]]
name = "parking_lot"
version = "0.12.1"
@@ -1730,9 +1641,9 @@ dependencies = [
[[package]]
name = "parquet"
-version = "28.0.0"
+version = "29.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "21433e9209111bb3720b747f2f137e0d115af1af0420a7a1c26b6e88227fa353"
+checksum = "d906343fd18ace6b998d5074697743e8e9358efa8c3c796a1381b98cba813338"
dependencies = [
"ahash",
"arrow-array",
@@ -1797,30 +1708,6 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
-[[package]]
-name = "proc-macro-error"
-version = "1.0.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
-dependencies = [
- "proc-macro-error-attr",
- "proc-macro2",
- "quote",
- "syn",
- "version_check",
-]
-
-[[package]]
-name = "proc-macro-error-attr"
-version = "1.0.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
-dependencies = [
- "proc-macro2",
- "quote",
- "version_check",
-]
-
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@@ -1979,9 +1866,9 @@ checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
[[package]]
name = "regex-syntax"
-version = "0.6.27"
+version = "0.6.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244"
+checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848"
[[package]]
name = "remove_dir_all"
@@ -2053,20 +1940,6 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422"
-[[package]]
-name = "rustix"
-version = "0.36.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cb93e85278e08bb5788653183213d3a60fc242b10cb9be96586f5a73dcb67c23"
-dependencies = [
- "bitflags",
- "errno",
- "io-lifetimes",
- "libc",
- "linux-raw-sys",
- "windows-sys 0.42.0",
-]
-
[[package]]
name = "rustls"
version = "0.20.6"
@@ -2251,31 +2124,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
-name = "sqllogictest"
-version = "0.8.0"
+name = "sqlparser"
+version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ba41e01d229d7725401de371e323851f82d839d68732a06162405362b60852fe"
+checksum = "db67dc6ef36edb658196c3fef0464a80b53dbbc194a904e81f9bd4190f9ecc5b"
dependencies = [
- "async-trait",
- "difference",
- "futures",
- "glob",
- "humantime",
- "itertools",
- "libtest-mimic",
- "regex",
- "tempfile",
- "thiserror",
- "tracing",
+ "log",
+ "sqlparser_derive",
]
[[package]]
-name = "sqlparser"
-version = "0.27.0"
+name = "sqlparser_derive"
+version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aba319938d4bfe250a769ac88278b629701024fe16f34257f9563bc628081970"
+checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
dependencies = [
- "log",
+ "proc-macro2",
+ "quote",
+ "syn",
]
[[package]]
@@ -2284,12 +2150,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
-[[package]]
-name = "strsim"
-version = "0.10.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
-
[[package]]
name = "strum"
version = "0.24.1"
@@ -2375,15 +2235,6 @@ dependencies = [
"syn",
]
-[[package]]
-name = "threadpool"
-version = "1.8.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
-dependencies = [
- "num_cpus",
-]
-
[[package]]
name = "thrift"
version = "0.17.0"
diff --git a/Cargo.toml b/Cargo.toml
index 8c32d7e..9569384 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -25,7 +25,7 @@ description = "Apache Arrow DataFusion DataFrame and SQL Query Engine"
readme = "README.md"
license = "Apache-2.0"
edition = "2021"
-rust-version = "1.57"
+rust-version = "1.62"
[features]
default = ["mimalloc"]
@@ -34,16 +34,17 @@ default = ["mimalloc"]
tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync"] }
rand = "0.8"
pyo3 = { version = "~0.17.3", features = ["extension-module", "abi3", "abi3-py37"] }
-datafusion = { version = "^15.0.0", features = ["pyarrow", "avro"] }
-datafusion-expr = { version = "^15.0.0" }
-datafusion-optimizer = { version = "^15.0.0" }
-datafusion-common = { version = "^15.0.0", features = ["pyarrow"] }
+datafusion = { version = "16.0.0", features = ["pyarrow", "avro"] }
+datafusion-expr = { version = "16.0.0" }
+datafusion-optimizer = { version = "16.0.0" }
+datafusion-common = { version = "16.0.0", features = ["pyarrow"] }
uuid = { version = "0.8", features = ["v4"] }
mimalloc = { version = "*", optional = true, default-features = false }
async-trait = "0.1"
futures = "0.3"
object_store = { version = "0.5.3", features = ["aws", "gcp", "azure"] }
parking_lot = "0.12"
+regex-syntax = "0.6.28"
[lib]
name = "datafusion_python"
diff --git a/datafusion/tests/test_config.py b/datafusion/tests/test_config.py
index 1e06161..960e72c 100644
--- a/datafusion/tests/test_config.py
+++ b/datafusion/tests/test_config.py
@@ -27,14 +27,18 @@ def config():
def test_get_then_set(config):
config_key = "datafusion.optimizer.filter_null_join_keys"
- assert config.get(config_key).as_py() is False
+ assert config.get(config_key) == "false"
- config.set(config_key, True)
- assert config.get(config_key).as_py() is True
+ config.set(config_key, "true")
+ assert config.get(config_key) == "true"
def test_get_all(config):
- config.get_all()
+ config_dict = config.get_all()
+ assert (
+ config_dict["datafusion.catalog.create_default_catalog_and_schema"]
+ == "true"
+ )
def test_get_invalid_config(config):
diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py
index 51f7c22..aac5db2 100644
--- a/datafusion/tests/test_dataframe.py
+++ b/datafusion/tests/test_dataframe.py
@@ -197,6 +197,9 @@ def test_distinct():
assert df_a.collect() == df_b.collect()
+@pytest.mark.skip(
+ reason="https://github.com/apache/arrow-datafusion-python/issues/135"
+)
def test_window_lead(df):
df = df.select(
column("a"),
diff --git a/datafusion/tests/test_config.py b/dev/python_lint.sh
old mode 100644
new mode 100755
similarity index 63%
copy from datafusion/tests/test_config.py
copy to dev/python_lint.sh
index 1e06161..9493462
--- a/datafusion/tests/test_config.py
+++ b/dev/python_lint.sh
@@ -1,3 +1,5 @@
+#!/bin/bash
+
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,27 +17,10 @@
# specific language governing permissions and limitations
# under the License.
-from datafusion import Config
-import pytest
-
-
-@pytest.fixture
-def config():
- return Config()
-
-
-def test_get_then_set(config):
- config_key = "datafusion.optimizer.filter_null_join_keys"
-
- assert config.get(config_key).as_py() is False
-
- config.set(config_key, True)
- assert config.get(config_key).as_py() is True
-
-
-def test_get_all(config):
- config.get_all()
-
+# This script runs all the Rust lints locally the same way the
+# DataFusion CI does
-def test_get_invalid_config(config):
- assert config.get("not.valid.key") is None
+set -e
+source venv/bin/activate
+flake8 --exclude venv --ignore=E501,W503
+black --line-length 79 .
diff --git a/src/catalog.rs b/src/catalog.rs
index d7a6b8a..2929a88 100644
--- a/src/catalog.rs
+++ b/src/catalog.rs
@@ -21,6 +21,8 @@ use std::sync::Arc;
use pyo3::exceptions::PyKeyError;
use pyo3::prelude::*;
+use crate::errors::DataFusionError;
+use crate::utils::wait_for_future;
use datafusion::{
arrow::pyarrow::PyArrowConvert,
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
@@ -88,13 +90,11 @@ impl PyDatabase {
self.database.table_names().into_iter().collect()
}
- fn table(&self, name: &str) -> PyResult<PyTable> {
- match self.database.table(name) {
- Some(table) => Ok(PyTable::new(table)),
- None => Err(PyKeyError::new_err(format!(
- "Table with name {} doesn't exist.",
- name
- ))),
+ fn table(&self, name: &str, py: Python) -> PyResult<PyTable> {
+ if let Some(table) = wait_for_future(py, self.database.table(name)) {
+ Ok(PyTable::new(table))
+ } else {
+ Err(DataFusionError::Common(format!("Table not found: {}", name)).into())
}
}
diff --git a/src/config.rs b/src/config.rs
index 05a6a0c..d5d5777 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -38,27 +38,37 @@ impl PyConfig {
/// Get configurations from environment variables
#[staticmethod]
- pub fn from_env() -> Self {
- Self {
- config: ConfigOptions::from_env(),
- }
+ pub fn from_env() -> PyResult<Self> {
+ Ok(Self {
+ config: ConfigOptions::from_env()?,
+ })
}
/// Get a configuration option
pub fn get(&mut self, key: &str, py: Python) -> PyResult<PyObject> {
- Ok(self.config.get(key).into_py(py))
+ let options = self.config.to_owned();
+ for entry in options.entries() {
+ if entry.key == key {
+ return Ok(entry.value.into_py(py));
+ }
+ }
+ Ok(None::<String>.into_py(py))
}
/// Set a configuration option
- pub fn set(&mut self, key: &str, value: PyObject, py: Python) {
- self.config.set(key, py_obj_to_scalar_value(py, value))
+ pub fn set(&mut self, key: &str, value: PyObject, py: Python) -> PyResult<()> {
+ let scalar_value = py_obj_to_scalar_value(py, value);
+ self.config
+ .set(key, scalar_value.to_string().as_str())
+ .map_err(|e| e.into())
}
/// Get all configuration options
pub fn get_all(&mut self, py: Python) -> PyResult<PyObject> {
let dict = PyDict::new(py);
- for (key, value) in self.config.options() {
- dict.set_item(key, value.clone().into_py(py))?;
+ let options = self.config.to_owned();
+ for entry in options.entries() {
+ dict.set_item(entry.key, entry.value.clone().into_py(py))?;
}
Ok(dict.into())
}
diff --git a/src/context.rs b/src/context.rs
index 344d5c7..2ffa3da 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -25,8 +25,6 @@ use uuid::Uuid;
use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::prelude::*;
-use parking_lot::RwLock;
-
use crate::catalog::{PyCatalog, PyTable};
use crate::dataframe::PyDataFrame;
use crate::dataset::Dataset;
@@ -38,11 +36,13 @@ use crate::utils::wait_for_future;
use datafusion::arrow::datatypes::{DataType, Schema};
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::config::ConfigOptions;
use datafusion::datasource::datasource::TableProvider;
use datafusion::datasource::MemTable;
use datafusion::execution::context::{SessionConfig, SessionContext};
-use datafusion::prelude::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions};
+use datafusion::prelude::{
+ AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions, ParquetReadOptions,
+};
+use datafusion_common::ScalarValue;
/// `PySessionContext` is able to plan and execute DataFusion plans.
/// It has a powerful optimizer, a physical planner for local execution, and a
@@ -79,41 +79,32 @@ impl PySessionContext {
parquet_pruning: bool,
target_partitions: Option<usize>,
config_options: Option<HashMap<String, String>>,
- ) -> Self {
- let mut options = ConfigOptions::from_env();
- if let Some(hash_map) = config_options {
- for (k, v) in &hash_map {
- if let Ok(v) = v.parse::<bool>() {
- options.set_bool(k, v);
- } else if let Ok(v) = v.parse::<u64>() {
- options.set_u64(k, v);
- } else {
- options.set_string(k, v);
- }
- }
- }
- let config_options = Arc::new(RwLock::new(options));
-
+ ) -> PyResult<Self> {
let mut cfg = SessionConfig::new()
- .create_default_catalog_and_schema(create_default_catalog_and_schema)
- .with_default_catalog_and_schema(default_catalog, default_schema)
.with_information_schema(information_schema)
.with_repartition_joins(repartition_joins)
.with_repartition_aggregations(repartition_aggregations)
.with_repartition_windows(repartition_windows)
.with_parquet_pruning(parquet_pruning);
- // TODO we should add a `with_config_options` to `SessionConfig`
- cfg.config_options = config_options;
+ if create_default_catalog_and_schema {
+ cfg = cfg.with_default_catalog_and_schema(default_catalog, default_schema);
+ }
+
+ if let Some(hash_map) = config_options {
+ for (k, v) in &hash_map {
+ cfg = cfg.set(k, ScalarValue::Utf8(Some(v.clone())));
+ }
+ }
let cfg_full = match target_partitions {
None => cfg,
Some(x) => cfg.with_target_partitions(x),
};
- PySessionContext {
+ Ok(PySessionContext {
ctx: SessionContext::with_config(cfg_full),
- }
+ })
}
/// Register a an object store with the given name
@@ -161,6 +152,7 @@ impl PySessionContext {
fn create_dataframe(
&mut self,
partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
+ py: Python,
) -> PyResult<PyDataFrame> {
let schema = partitions.0[0][0].schema();
let table = MemTable::try_new(schema, partitions.0).map_err(DataFusionError::from)?;
@@ -175,7 +167,8 @@ impl PySessionContext {
self.ctx
.register_table(&*name, Arc::new(table))
.map_err(DataFusionError::from)?;
- let table = self.ctx.table(&*name).map_err(DataFusionError::from)?;
+
+ let table = wait_for_future(py, self._table(&name)).map_err(DataFusionError::from)?;
let df = PyDataFrame::new(table);
Ok(df)
@@ -311,8 +304,9 @@ impl PySessionContext {
self.ctx.tables().unwrap()
}
- fn table(&self, name: &str) -> PyResult<PyDataFrame> {
- Ok(PyDataFrame::new(self.ctx.table(name)?))
+ fn table(&self, name: &str, py: Python) -> PyResult<PyDataFrame> {
+ let x = wait_for_future(py, self.ctx.table(name)).map_err(DataFusionError::from)?;
+ Ok(PyDataFrame::new(x))
}
fn table_exist(&self, name: &str) -> PyResult<bool> {
@@ -348,12 +342,16 @@ impl PySessionContext {
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
let mut options = NdJsonReadOptions::default()
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
- options.schema = schema.map(|s| Arc::new(s.0));
options.schema_infer_max_records = schema_infer_max_records;
options.file_extension = file_extension;
-
- let result = self.ctx.read_json(path, options);
- let df = wait_for_future(py, result).map_err(DataFusionError::from)?;
+ let df = if let Some(schema) = schema {
+ options.schema = Some(&schema.0);
+ let result = self.ctx.read_json(path, options);
+ wait_for_future(py, result).map_err(DataFusionError::from)?
+ } else {
+ let result = self.ctx.read_json(path, options);
+ wait_for_future(py, result).map_err(DataFusionError::from)?
+ };
Ok(PyDataFrame::new(df))
}
@@ -451,11 +449,21 @@ impl PySessionContext {
let mut options = AvroReadOptions::default()
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
options.file_extension = file_extension;
- options.schema = schema.map(|s| Arc::new(s.0));
+ let df = if let Some(schema) = schema {
+ options.schema = Some(&schema.0);
+ let read_future = self.ctx.read_avro(path, options);
+ wait_for_future(py, read_future).map_err(DataFusionError::from)?
+ } else {
+ let read_future = self.ctx.read_avro(path, options);
+ wait_for_future(py, read_future).map_err(DataFusionError::from)?
+ };
+ Ok(PyDataFrame::new(df))
+ }
+}
- let result = self.ctx.read_avro(path, options);
- let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);
- Ok(df)
+impl PySessionContext {
+ async fn _table(&self, name: &str) -> datafusion_common::Result<DataFrame> {
+ self.ctx.table(name).await
}
}
diff --git a/src/dataframe.rs b/src/dataframe.rs
index ab8f2c4..b56cadf 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -38,8 +38,8 @@ pub(crate) struct PyDataFrame {
impl PyDataFrame {
/// creates a new PyDataFrame
- pub fn new(df: Arc<DataFrame>) -> Self {
- Self { df }
+ pub fn new(df: DataFrame) -> Self {
+ Self { df: Arc::new(df) }
}
}
@@ -71,50 +71,54 @@ impl PyDataFrame {
#[args(args = "*")]
fn select_columns(&self, args: Vec<&str>) -> PyResult<Self> {
- let df = self.df.select_columns(&args)?;
+ let df = self.df.as_ref().clone().select_columns(&args)?;
Ok(Self::new(df))
}
#[args(args = "*")]
fn select(&self, args: Vec<PyExpr>) -> PyResult<Self> {
let expr = args.into_iter().map(|e| e.into()).collect();
- let df = self.df.select(expr)?;
+ let df = self.df.as_ref().clone().select(expr)?;
Ok(Self::new(df))
}
fn filter(&self, predicate: PyExpr) -> PyResult<Self> {
- let df = self.df.filter(predicate.into())?;
+ let df = self.df.as_ref().clone().filter(predicate.into())?;
Ok(Self::new(df))
}
fn with_column(&self, name: &str, expr: PyExpr) -> PyResult<Self> {
- let df = self.df.with_column(name, expr.into())?;
+ let df = self.df.as_ref().clone().with_column(name, expr.into())?;
Ok(Self::new(df))
}
/// Rename one column by applying a new projection. This is a no-op if the column to be
/// renamed does not exist.
fn with_column_renamed(&self, old_name: &str, new_name: &str) -> PyResult<Self> {
- let df = self.df.with_column_renamed(old_name, new_name)?;
+ let df = self
+ .df
+ .as_ref()
+ .clone()
+ .with_column_renamed(old_name, new_name)?;
Ok(Self::new(df))
}
fn aggregate(&self, group_by: Vec<PyExpr>, aggs: Vec<PyExpr>) -> PyResult<Self> {
let group_by = group_by.into_iter().map(|e| e.into()).collect();
let aggs = aggs.into_iter().map(|e| e.into()).collect();
- let df = self.df.aggregate(group_by, aggs)?;
+ let df = self.df.as_ref().clone().aggregate(group_by, aggs)?;
Ok(Self::new(df))
}
#[args(exprs = "*")]
fn sort(&self, exprs: Vec<PyExpr>) -> PyResult<Self> {
let exprs = exprs.into_iter().map(|e| e.into()).collect();
- let df = self.df.sort(exprs)?;
+ let df = self.df.as_ref().clone().sort(exprs)?;
Ok(Self::new(df))
}
fn limit(&self, count: usize) -> PyResult<Self> {
- let df = self.df.limit(0, Some(count))?;
+ let df = self.df.as_ref().clone().limit(0, Some(count))?;
Ok(Self::new(df))
}
@@ -122,7 +126,7 @@ impl PyDataFrame {
/// Unless some order is specified in the plan, there is no
/// guarantee of the order of the result.
fn collect(&self, py: Python) -> PyResult<Vec<PyObject>> {
- let batches = wait_for_future(py, self.df.collect())?;
+ let batches = wait_for_future(py, self.df.as_ref().clone().collect())?;
// cannot use PyResult<Vec<RecordBatch>> return type due to
// https://github.com/PyO3/pyo3/issues/1813
batches.into_iter().map(|rb| rb.to_pyarrow(py)).collect()
@@ -130,14 +134,14 @@ impl PyDataFrame {
/// Cache DataFrame.
fn cache(&self, py: Python) -> PyResult<Self> {
- let df = wait_for_future(py, self.df.cache())?;
+ let df = wait_for_future(py, self.df.as_ref().clone().cache())?;
Ok(Self::new(df))
}
/// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
/// maintaining the input partitioning.
fn collect_partitioned(&self, py: Python) -> PyResult<Vec<Vec<PyObject>>> {
- let batches = wait_for_future(py, self.df.collect_partitioned())?;
+ let batches = wait_for_future(py, self.df.as_ref().clone().collect_partitioned())?;
batches
.into_iter()
@@ -148,14 +152,14 @@ impl PyDataFrame {
/// Print the result, 20 lines by default
#[args(num = "20")]
fn show(&self, py: Python, num: usize) -> PyResult<()> {
- let df = self.df.limit(0, Some(num))?;
+ let df = self.df.as_ref().clone().limit(0, Some(num))?;
let batches = wait_for_future(py, df.collect())?;
pretty::print_batches(&batches).map_err(|err| PyArrowException::new_err(err.to_string()))
}
/// Filter out duplicate rows
fn distinct(&self) -> PyResult<Self> {
- let df = self.df.distinct()?;
+ let df = self.df.as_ref().clone().distinct()?;
Ok(Self::new(df))
}
@@ -181,23 +185,31 @@ impl PyDataFrame {
}
};
- let df = self
- .df
- .join(right.df, join_type, &join_keys.0, &join_keys.1, None)?;
+ let df = self.df.as_ref().clone().join(
+ right.df.as_ref().clone(),
+ join_type,
+ &join_keys.0,
+ &join_keys.1,
+ None,
+ )?;
Ok(Self::new(df))
}
/// Print the query plan
#[args(verbose = false, analyze = false)]
fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyResult<()> {
- let df = self.df.explain(verbose, analyze)?;
+ let df = self.df.as_ref().clone().explain(verbose, analyze)?;
let batches = wait_for_future(py, df.collect())?;
pretty::print_batches(&batches).map_err(|err| PyArrowException::new_err(err.to_string()))
}
/// Repartition a `DataFrame` based on a logical partitioning scheme.
fn repartition(&self, num: usize) -> PyResult<Self> {
- let new_df = self.df.repartition(Partitioning::RoundRobinBatch(num))?;
+ let new_df = self
+ .df
+ .as_ref()
+ .clone()
+ .repartition(Partitioning::RoundRobinBatch(num))?;
Ok(Self::new(new_df))
}
@@ -205,7 +217,11 @@ impl PyDataFrame {
#[args(args = "*", num)]
fn repartition_by_hash(&self, args: Vec<PyExpr>, num: usize) -> PyResult<Self> {
let expr = args.into_iter().map(|py_expr| py_expr.into()).collect();
- let new_df = self.df.repartition(Partitioning::Hash(expr, num))?;
+ let new_df = self
+ .df
+ .as_ref()
+ .clone()
+ .repartition(Partitioning::Hash(expr, num))?;
Ok(Self::new(new_df))
}
@@ -214,9 +230,12 @@ impl PyDataFrame {
#[args(distinct = false)]
fn union(&self, py_df: PyDataFrame, distinct: bool) -> PyResult<Self> {
let new_df = if distinct {
- self.df.union_distinct(py_df.df)?
+ self.df
+ .as_ref()
+ .clone()
+ .union_distinct(py_df.df.as_ref().clone())?
} else {
- self.df.union(py_df.df)?
+ self.df.as_ref().clone().union(py_df.df.as_ref().clone())?
};
Ok(Self::new(new_df))
@@ -225,37 +244,45 @@ impl PyDataFrame {
/// Calculate the distinct union of two `DataFrame`s. The
/// two `DataFrame`s must have exactly the same schema
fn union_distinct(&self, py_df: PyDataFrame) -> PyResult<Self> {
- let new_df = self.df.union_distinct(py_df.df)?;
+ let new_df = self
+ .df
+ .as_ref()
+ .clone()
+ .union_distinct(py_df.df.as_ref().clone())?;
Ok(Self::new(new_df))
}
/// Calculate the intersection of two `DataFrame`s. The two `DataFrame`s must have exactly the same schema
fn intersect(&self, py_df: PyDataFrame) -> PyResult<Self> {
- let new_df = self.df.intersect(py_df.df)?;
+ let new_df = self
+ .df
+ .as_ref()
+ .clone()
+ .intersect(py_df.df.as_ref().clone())?;
Ok(Self::new(new_df))
}
/// Calculate the exception of two `DataFrame`s. The two `DataFrame`s must have exactly the same schema
fn except_all(&self, py_df: PyDataFrame) -> PyResult<Self> {
- let new_df = self.df.except(py_df.df)?;
+ let new_df = self.df.as_ref().clone().except(py_df.df.as_ref().clone())?;
Ok(Self::new(new_df))
}
/// Write a `DataFrame` to a CSV file.
fn write_csv(&self, path: &str, py: Python) -> PyResult<()> {
- wait_for_future(py, self.df.write_csv(path))?;
+ wait_for_future(py, self.df.as_ref().clone().write_csv(path))?;
Ok(())
}
/// Write a `DataFrame` to a Parquet file.
fn write_parquet(&self, path: &str, py: Python) -> PyResult<()> {
- wait_for_future(py, self.df.write_parquet(path, None))?;
+ wait_for_future(py, self.df.as_ref().clone().write_parquet(path, None))?;
Ok(())
}
/// Executes a query and writes the results to a partitioned JSON file.
fn write_json(&self, path: &str, py: Python) -> PyResult<()> {
- wait_for_future(py, self.df.write_json(path))?;
+ wait_for_future(py, self.df.as_ref().clone().write_json(path))?;
Ok(())
}
}
diff --git a/src/dataset_exec.rs b/src/dataset_exec.rs
index 9c41218..be6dc1e 100644
--- a/src/dataset_exec.rs
+++ b/src/dataset_exec.rs
@@ -162,10 +162,6 @@ impl ExecutionPlan for DatasetExec {
})
}
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
diff --git a/src/functions.rs b/src/functions.rs
index 5a0a20d..d40ffbe 100644
--- a/src/functions.rs
+++ b/src/functions.rs
@@ -17,10 +17,12 @@
use pyo3::{prelude::*, wrap_pyfunction};
-use datafusion::physical_plan::aggregates::AggregateFunction;
-use datafusion_expr::{lit, BuiltinScalarFunction};
+use datafusion_expr::expr::AggregateFunction;
+use datafusion_expr::expr::{Sort, WindowFunction};
+use datafusion_expr::window_function::find_df_window_func;
+use datafusion_expr::{lit, BuiltinScalarFunction, WindowFrame};
-use crate::errors;
+use crate::errors::DataFusionError;
use crate::expression::PyExpr;
#[pyfunction]
@@ -63,11 +65,11 @@ fn concat_ws(sep: String, args: Vec<PyExpr>) -> PyResult<PyExpr> {
#[pyfunction]
fn order_by(expr: PyExpr, asc: Option<bool>, nulls_first: Option<bool>) -> PyResult<PyExpr> {
Ok(PyExpr {
- expr: datafusion_expr::Expr::Sort {
+ expr: datafusion_expr::Expr::Sort(Sort {
expr: Box::new(expr.expr),
asc: asc.unwrap_or(true),
nulls_first: nulls_first.unwrap_or(true),
- },
+ }),
})
}
@@ -87,11 +89,14 @@ fn window(
partition_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PyExpr>>,
) -> PyResult<PyExpr> {
- use std::str::FromStr;
- let fun = datafusion_expr::window_function::WindowFunction::from_str(name)
- .map_err(|e| -> errors::DataFusionError { e.into() })?;
+ let fun = find_df_window_func(name);
+ if fun.is_none() {
+ return Err(DataFusionError::Common("window function not found".to_string()).into());
+ }
+ let fun = fun.unwrap();
+ let window_frame = WindowFrame::new(order_by.is_some());
Ok(PyExpr {
- expr: datafusion_expr::Expr::WindowFunction {
+ expr: datafusion_expr::Expr::WindowFunction(WindowFunction {
fun,
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
partition_by: partition_by
@@ -104,8 +109,8 @@ fn window(
.into_iter()
.map(|x| x.expr)
.collect::<Vec<_>>(),
- window_frame: None,
- },
+ window_frame,
+ }),
})
}
@@ -135,12 +140,12 @@ macro_rules! aggregate_function {
#[doc = $DOC]
#[pyfunction(args = "*", distinct = "false")]
fn $NAME(args: Vec<PyExpr>, distinct: bool) -> PyExpr {
- let expr = datafusion_expr::Expr::AggregateFunction {
- fun: AggregateFunction::$FUNC,
+ let expr = datafusion_expr::Expr::AggregateFunction(AggregateFunction {
+ fun: datafusion_expr::aggregate_function::AggregateFunction::$FUNC,
args: args.into_iter().map(|e| e.into()).collect(),
distinct,
filter: None,
- };
+ });
expr.into()
}
};
diff --git a/src/udaf.rs b/src/udaf.rs
index 66dc274..9d96e2c 100644
--- a/src/udaf.rs
+++ b/src/udaf.rs
@@ -24,9 +24,7 @@ use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType};
use datafusion::common::ScalarValue;
use datafusion::error::{DataFusionError, Result};
-use datafusion_expr::{
- create_udaf, Accumulator, AccumulatorFunctionImplementation, AggregateState, AggregateUDF,
-};
+use datafusion_expr::{create_udaf, Accumulator, AccumulatorFunctionImplementation, AggregateUDF};
use crate::expression::PyExpr;
use crate::utils::parse_volatility;
@@ -43,13 +41,9 @@ impl RustAccumulator {
}
impl Accumulator for RustAccumulator {
- fn state(&self) -> Result<Vec<AggregateState>> {
- let py_result: PyResult<Vec<ScalarValue>> =
- Python::with_gil(|py| self.accum.as_ref(py).call_method0("state")?.extract());
- match py_result {
- Ok(r) => Ok(r.into_iter().map(AggregateState::Scalar).collect()),
- Err(e) => Err(DataFusionError::Execution(format!("{}", e))),
- }
+ fn state(&self) -> Result<Vec<ScalarValue>> {
+ Python::with_gil(|py| self.accum.as_ref(py).call_method0("state")?.extract())
+ .map_err(|e| DataFusionError::Execution(format!("{}", e)))
}
fn evaluate(&self) -> Result<ScalarValue> {