You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2023/01/19 01:17:51 UTC

[arrow-datafusion-python] branch master updated: Upgrade to DataFusion 16.0.0 (#115)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 575367c  Upgrade to DataFusion 16.0.0 (#115)
575367c is described below

commit 575367c965a5d1257846dccc3a0ef00a8fd5c5ce
Author: Andy Grove <an...@gmail.com>
AuthorDate: Wed Jan 18 18:17:45 2023 -0700

    Upgrade to DataFusion 16.0.0 (#115)
---
 .github/workflows/test.yaml                        |   2 +-
 Cargo.lock                                         | 327 ++++++---------------
 Cargo.toml                                         |  11 +-
 datafusion/tests/test_config.py                    |  12 +-
 datafusion/tests/test_dataframe.py                 |   3 +
 .../tests/test_config.py => dev/python_lint.sh     |  31 +-
 src/catalog.rs                                     |  14 +-
 src/config.rs                                      |  28 +-
 src/context.rs                                     |  80 ++---
 src/dataframe.rs                                   |  85 ++++--
 src/dataset_exec.rs                                |   4 -
 src/functions.rs                                   |  33 ++-
 src/udaf.rs                                        |  14 +-
 13 files changed, 264 insertions(+), 380 deletions(-)

diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index a416fbd..327b1cf 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -96,7 +96,7 @@ jobs:
         if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }}
         run: |
           source venv/bin/activate
-          flake8 --exclude venv --ignore=E501
+          flake8 --exclude venv --ignore=E501,W503
           black --line-length 79 --diff --check .
 
       - name: Run tests
diff --git a/Cargo.lock b/Cargo.lock
index e4e14f9..0f3fbea 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -101,9 +101,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
 
 [[package]]
 name = "arrow"
-version = "28.0.0"
+version = "29.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aed9849f86164fad5cb66ce4732782b15f1bc97f8febab04e782c20cce9d4b6c"
+checksum = "2fe17dc0113da7e2eaeaedbd304d347aa8ea64916d225b79a5c3f3b6b5d8da4c"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -113,8 +113,10 @@ dependencies = [
  "arrow-data",
  "arrow-ipc",
  "arrow-json",
+ "arrow-ord",
  "arrow-schema",
  "arrow-select",
+ "arrow-string",
  "bitflags",
  "chrono",
  "comfy-table",
@@ -124,14 +126,13 @@ dependencies = [
  "num",
  "pyo3",
  "regex",
- "regex-syntax",
 ]
 
 [[package]]
 name = "arrow-array"
-version = "28.0.0"
+version = "29.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b8504cf0a6797e908eecf221a865e7d339892720587f87c8b90262863015b08"
+checksum = "b9452131e027aec3276e43449162af084db611c42ef875e54d231e6580bc6254"
 dependencies = [
  "ahash",
  "arrow-buffer",
@@ -145,9 +146,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-buffer"
-version = "28.0.0"
+version = "29.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d6de64a27cea684b24784647d9608314bc80f7c4d55acb44a425e05fab39d916"
+checksum = "4a301001e8ed7da638a12fa579ac5f3f154c44c0655f2ca6ed0f8586b418a779"
 dependencies = [
  "half",
  "num",
@@ -155,9 +156,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-cast"
-version = "28.0.0"
+version = "29.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bec4a54502eefe05923c385c90a005d69474fa06ca7aa2a2b123c9f9532f6178"
+checksum = "048c91d067f2eb8cc327f086773e5b0f0d7714780807fc4db09366584e23bac8"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -171,9 +172,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-csv"
-version = "28.0.0"
+version = "29.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e7902bbf8127eac48554fe902775303377047ad49a9fd473c2b8cb399d092080"
+checksum = "ed914cd0006a3bb9cac8136b3098ac7796ad26b82362f00d4f2e7c1a54684b86"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -189,9 +190,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-data"
-version = "28.0.0"
+version = "29.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7e4882efe617002449d5c6b5de9ddb632339074b36df8a96ea7147072f1faa8a"
+checksum = "e59619d9d102e4e6b22087b2bd60c07df76fcb68683620841718f6bc8e8f02cb"
 dependencies = [
  "arrow-buffer",
  "arrow-schema",
@@ -201,9 +202,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-ipc"
-version = "28.0.0"
+version = "29.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fa0703a6de2785828561b03a4d7793ecd333233e1b166316b4bfc7cfce55a4a7"
+checksum = "fb7ad6d2fa06a1cebdaa213c59fc953b9230e560d8374aba133b572b864ec55e"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -215,9 +216,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-json"
-version = "28.0.0"
+version = "29.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3bd23fc8c6d251f96cd63b96fece56bbb9710ce5874a627cb786e2600673595a"
+checksum = "1e22efab3ad70336057660c5e5f2b72e2417e3444c27cb42dc477d678ddd6979"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -231,17 +232,31 @@ dependencies = [
  "serde_json",
 ]
 
+[[package]]
+name = "arrow-ord"
+version = "29.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e23b623332804a65ad11e7732c351896dcb132c19f8e25d99fdb13b00aae5206"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "arrow-select",
+ "num",
+]
+
 [[package]]
 name = "arrow-schema"
-version = "28.0.0"
+version = "29.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "da9f143882a80be168538a60e298546314f50f11f2a288c8d73e11108da39d26"
+checksum = "69ef17c144f1253b9864f5a3e8f4c6f1e436bdd52394855d5942f132f776b64e"
 
 [[package]]
 name = "arrow-select"
-version = "28.0.0"
+version = "29.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "520406331d4ad60075359524947ebd804e479816439af82bcb17f8d280d9b38c"
+checksum = "e2accaf218ff107e3df0ee8f1e09b092249a1cc741c4377858a1470fd27d7096"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -250,6 +265,21 @@ dependencies = [
  "num",
 ]
 
+[[package]]
+name = "arrow-string"
+version = "29.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4a0954f9e1f45b04815ddacbde72899bf3c03a08fa6c0375f42178c4a01a510"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-data",
+ "arrow-schema",
+ "arrow-select",
+ "regex",
+ "regex-syntax",
+]
+
 [[package]]
 name = "async-compression"
 version = "0.3.15"
@@ -433,43 +463,6 @@ dependencies = [
  "winapi",
 ]
 
-[[package]]
-name = "clap"
-version = "4.0.29"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4d63b9e9c07271b9957ad22c173bae2a4d9a81127680962039296abcd2f8251d"
-dependencies = [
- "bitflags",
- "clap_derive",
- "clap_lex",
- "is-terminal",
- "once_cell",
- "strsim",
- "termcolor",
-]
-
-[[package]]
-name = "clap_derive"
-version = "4.0.21"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0177313f9f02afc995627906bbd8967e2be069f5261954222dac78290c2b9014"
-dependencies = [
- "heck",
- "proc-macro-error",
- "proc-macro2",
- "quote",
- "syn",
-]
-
-[[package]]
-name = "clap_lex"
-version = "0.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8"
-dependencies = [
- "os_str_bytes",
-]
-
 [[package]]
 name = "codespan-reporting"
 version = "0.11.1"
@@ -640,9 +633,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion"
-version = "15.0.0"
+version = "16.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b75a088adf79515b04fd3895c1a14dc249c8f7a7f27b59870a05546fe9a55542"
+checksum = "f166d67281ee90d0b35d93a9d13b3b32c26f35348da9e5101a12e9de6093bd06"
 dependencies = [
  "ahash",
  "apache-avro",
@@ -663,6 +656,7 @@ dependencies = [
  "futures",
  "glob",
  "hashbrown 0.13.1",
+ "indexmap",
  "itertools",
  "lazy_static",
  "log",
@@ -677,7 +671,6 @@ dependencies = [
  "pyo3",
  "rand",
  "smallvec",
- "sqllogictest",
  "sqlparser",
  "tempfile",
  "tokio",
@@ -690,13 +683,14 @@ dependencies = [
 
 [[package]]
 name = "datafusion-common"
-version = "15.0.0"
+version = "16.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7b17262b899f79afdf502846d1138a8b48441afe24dc6e07c922105289248137"
+checksum = "5cb2144c73ca974b00bf735a6e8692efe22c65731097bc49018a0edfbd1d0120"
 dependencies = [
  "apache-avro",
  "arrow",
  "chrono",
+ "num_cpus",
  "object_store",
  "parquet",
  "pyo3",
@@ -705,9 +699,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-expr"
-version = "15.0.0"
+version = "16.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "533d2226b4636a1306d1f6f4ac02e436947c5d6e8bfc85f6d8f91a425c10a407"
+checksum = "b6b988765372fdee77d805dda00cb7ffb28dfda831cc0b79aff9e09527b70402"
 dependencies = [
  "ahash",
  "arrow",
@@ -718,9 +712,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-optimizer"
-version = "15.0.0"
+version = "16.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ce7ba274267b6baf1714a67727249aa56d648c8814b0f4c43387fbe6d147e619"
+checksum = "50b44aa5128e3e6f76d7ae8efc3a595133abd74aa60c3f50f39f93ea00ef302c"
 dependencies = [
  "arrow",
  "async-trait",
@@ -730,13 +724,14 @@ dependencies = [
  "datafusion-physical-expr",
  "hashbrown 0.13.1",
  "log",
+ "regex-syntax",
 ]
 
 [[package]]
 name = "datafusion-physical-expr"
-version = "15.0.0"
+version = "16.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f35cb53e6c2f9c40accdf45aef2be7fde030ea3051b1145a059d96109e65b0bf"
+checksum = "0fd7e12ce4d61eb698a83fde3bf14417fe67a23d7131b1f00c28dc70183aeed7"
 dependencies = [
  "ahash",
  "arrow",
@@ -750,6 +745,7 @@ dependencies = [
  "datafusion-row",
  "half",
  "hashbrown 0.13.1",
+ "indexmap",
  "itertools",
  "lazy_static",
  "md-5",
@@ -777,15 +773,16 @@ dependencies = [
  "parking_lot",
  "pyo3",
  "rand",
+ "regex-syntax",
  "tokio",
  "uuid 0.8.2",
 ]
 
 [[package]]
 name = "datafusion-row"
-version = "15.0.0"
+version = "16.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "27c77b1229ae5cf6a6e0e2ba43ed4e98131dbf1cc4a97fad17c94230b32e0812"
+checksum = "e34f6c09f1458190bb90305d70c2075bf2dd4cf8c51a65d5635e5217a3bb8bff"
 dependencies = [
  "arrow",
  "datafusion-common",
@@ -795,22 +792,17 @@ dependencies = [
 
 [[package]]
 name = "datafusion-sql"
-version = "15.0.0"
+version = "16.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "569423fa8a50db39717080949e3b4f8763582b87baf393cc3fcf27cc21467ba7"
+checksum = "b42f29fd2c98e0e0030db4638f971c91145ef5e67ab139f0426b2891e14b9bf5"
 dependencies = [
  "arrow-schema",
  "datafusion-common",
  "datafusion-expr",
+ "log",
  "sqlparser",
 ]
 
-[[package]]
-name = "difference"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
-
 [[package]]
 name = "digest"
 version = "0.10.5"
@@ -843,27 +835,6 @@ dependencies = [
  "cfg-if",
 ]
 
-[[package]]
-name = "errno"
-version = "0.2.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1"
-dependencies = [
- "errno-dragonfly",
- "libc",
- "winapi",
-]
-
-[[package]]
-name = "errno-dragonfly"
-version = "0.1.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
-dependencies = [
- "cc",
- "libc",
-]
-
 [[package]]
 name = "fastrand"
 version = "1.8.0"
@@ -1083,15 +1054,6 @@ dependencies = [
  "libc",
 ]
 
-[[package]]
-name = "hermit-abi"
-version = "0.2.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
-dependencies = [
- "libc",
-]
-
 [[package]]
 name = "http"
 version = "0.2.8"
@@ -1126,12 +1088,6 @@ version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
 
-[[package]]
-name = "humantime"
-version = "2.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
-
 [[package]]
 name = "hyper"
 version = "0.14.20"
@@ -1205,9 +1161,9 @@ dependencies = [
 
 [[package]]
 name = "indexmap"
-version = "1.9.1"
+version = "1.9.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
+checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399"
 dependencies = [
  "autocfg",
  "hashbrown 0.12.3",
@@ -1234,34 +1190,12 @@ version = "3.0.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
 
-[[package]]
-name = "io-lifetimes"
-version = "1.0.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c"
-dependencies = [
- "libc",
- "windows-sys 0.42.0",
-]
-
 [[package]]
 name = "ipnet"
 version = "2.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b"
 
-[[package]]
-name = "is-terminal"
-version = "0.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "927609f78c2913a6f6ac3c27a4fe87f43e2a35367c0c4b0f8265e8f49a104330"
-dependencies = [
- "hermit-abi 0.2.6",
- "io-lifetimes",
- "rustix",
- "windows-sys 0.42.0",
-]
-
 [[package]]
 name = "itertools"
 version = "0.10.5"
@@ -1413,17 +1347,6 @@ dependencies = [
  "libc",
 ]
 
-[[package]]
-name = "libtest-mimic"
-version = "0.6.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d7b603516767d1ab23d0de09d023e62966c3322f7148297c35cf3d97aa8b37fa"
-dependencies = [
- "clap",
- "termcolor",
- "threadpool",
-]
-
 [[package]]
 name = "link-cplusplus"
 version = "1.0.7"
@@ -1433,12 +1356,6 @@ dependencies = [
  "cc",
 ]
 
-[[package]]
-name = "linux-raw-sys"
-version = "0.1.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f"
-
 [[package]]
 name = "lock_api"
 version = "0.4.9"
@@ -1652,7 +1569,7 @@ version = "1.13.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
 dependencies = [
- "hermit-abi 0.1.19",
+ "hermit-abi",
  "libc",
 ]
 
@@ -1699,12 +1616,6 @@ dependencies = [
  "num-traits",
 ]
 
-[[package]]
-name = "os_str_bytes"
-version = "6.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee"
-
 [[package]]
 name = "parking_lot"
 version = "0.12.1"
@@ -1730,9 +1641,9 @@ dependencies = [
 
 [[package]]
 name = "parquet"
-version = "28.0.0"
+version = "29.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "21433e9209111bb3720b747f2f137e0d115af1af0420a7a1c26b6e88227fa353"
+checksum = "d906343fd18ace6b998d5074697743e8e9358efa8c3c796a1381b98cba813338"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -1797,30 +1708,6 @@ version = "0.2.16"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
 
-[[package]]
-name = "proc-macro-error"
-version = "1.0.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
-dependencies = [
- "proc-macro-error-attr",
- "proc-macro2",
- "quote",
- "syn",
- "version_check",
-]
-
-[[package]]
-name = "proc-macro-error-attr"
-version = "1.0.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
-dependencies = [
- "proc-macro2",
- "quote",
- "version_check",
-]
-
 [[package]]
 name = "proc-macro-hack"
 version = "0.5.19"
@@ -1979,9 +1866,9 @@ checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
 
 [[package]]
 name = "regex-syntax"
-version = "0.6.27"
+version = "0.6.28"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244"
+checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848"
 
 [[package]]
 name = "remove_dir_all"
@@ -2053,20 +1940,6 @@ version = "1.0.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422"
 
-[[package]]
-name = "rustix"
-version = "0.36.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cb93e85278e08bb5788653183213d3a60fc242b10cb9be96586f5a73dcb67c23"
-dependencies = [
- "bitflags",
- "errno",
- "io-lifetimes",
- "libc",
- "linux-raw-sys",
- "windows-sys 0.42.0",
-]
-
 [[package]]
 name = "rustls"
 version = "0.20.6"
@@ -2251,31 +2124,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
 
 [[package]]
-name = "sqllogictest"
-version = "0.8.0"
+name = "sqlparser"
+version = "0.30.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ba41e01d229d7725401de371e323851f82d839d68732a06162405362b60852fe"
+checksum = "db67dc6ef36edb658196c3fef0464a80b53dbbc194a904e81f9bd4190f9ecc5b"
 dependencies = [
- "async-trait",
- "difference",
- "futures",
- "glob",
- "humantime",
- "itertools",
- "libtest-mimic",
- "regex",
- "tempfile",
- "thiserror",
- "tracing",
+ "log",
+ "sqlparser_derive",
 ]
 
 [[package]]
-name = "sqlparser"
-version = "0.27.0"
+name = "sqlparser_derive"
+version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aba319938d4bfe250a769ac88278b629701024fe16f34257f9563bc628081970"
+checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
 dependencies = [
- "log",
+ "proc-macro2",
+ "quote",
+ "syn",
 ]
 
 [[package]]
@@ -2284,12 +2150,6 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
 
-[[package]]
-name = "strsim"
-version = "0.10.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
-
 [[package]]
 name = "strum"
 version = "0.24.1"
@@ -2375,15 +2235,6 @@ dependencies = [
  "syn",
 ]
 
-[[package]]
-name = "threadpool"
-version = "1.8.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
-dependencies = [
- "num_cpus",
-]
-
 [[package]]
 name = "thrift"
 version = "0.17.0"
diff --git a/Cargo.toml b/Cargo.toml
index 8c32d7e..9569384 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -25,7 +25,7 @@ description = "Apache Arrow DataFusion DataFrame and SQL Query Engine"
 readme = "README.md"
 license = "Apache-2.0"
 edition = "2021"
-rust-version = "1.57"
+rust-version = "1.62"
 
 [features]
 default = ["mimalloc"]
@@ -34,16 +34,17 @@ default = ["mimalloc"]
 tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync"] }
 rand = "0.8"
 pyo3 = { version = "~0.17.3", features = ["extension-module", "abi3", "abi3-py37"] }
-datafusion = { version = "^15.0.0", features = ["pyarrow", "avro"] }
-datafusion-expr = { version = "^15.0.0" }
-datafusion-optimizer = { version = "^15.0.0" }
-datafusion-common = { version = "^15.0.0", features = ["pyarrow"] }
+datafusion = { version = "16.0.0", features = ["pyarrow", "avro"] }
+datafusion-expr = { version = "16.0.0" }
+datafusion-optimizer = { version = "16.0.0" }
+datafusion-common = { version = "16.0.0", features = ["pyarrow"] }
 uuid = { version = "0.8", features = ["v4"] }
 mimalloc = { version = "*", optional = true, default-features = false }
 async-trait = "0.1"
 futures = "0.3"
 object_store = { version = "0.5.3", features = ["aws", "gcp", "azure"] }
 parking_lot = "0.12"
+regex-syntax = "0.6.28"
 
 [lib]
 name = "datafusion_python"
diff --git a/datafusion/tests/test_config.py b/datafusion/tests/test_config.py
index 1e06161..960e72c 100644
--- a/datafusion/tests/test_config.py
+++ b/datafusion/tests/test_config.py
@@ -27,14 +27,18 @@ def config():
 def test_get_then_set(config):
     config_key = "datafusion.optimizer.filter_null_join_keys"
 
-    assert config.get(config_key).as_py() is False
+    assert config.get(config_key) == "false"
 
-    config.set(config_key, True)
-    assert config.get(config_key).as_py() is True
+    config.set(config_key, "true")
+    assert config.get(config_key) == "true"
 
 
 def test_get_all(config):
-    config.get_all()
+    config_dict = config.get_all()
+    assert (
+        config_dict["datafusion.catalog.create_default_catalog_and_schema"]
+        == "true"
+    )
 
 
 def test_get_invalid_config(config):
diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py
index 51f7c22..aac5db2 100644
--- a/datafusion/tests/test_dataframe.py
+++ b/datafusion/tests/test_dataframe.py
@@ -197,6 +197,9 @@ def test_distinct():
     assert df_a.collect() == df_b.collect()
 
 
+@pytest.mark.skip(
+    reason="https://github.com/apache/arrow-datafusion-python/issues/135"
+)
 def test_window_lead(df):
     df = df.select(
         column("a"),
diff --git a/datafusion/tests/test_config.py b/dev/python_lint.sh
old mode 100644
new mode 100755
similarity index 63%
copy from datafusion/tests/test_config.py
copy to dev/python_lint.sh
index 1e06161..9493462
--- a/datafusion/tests/test_config.py
+++ b/dev/python_lint.sh
@@ -1,3 +1,5 @@
+#!/bin/bash
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,27 +17,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from datafusion import Config
-import pytest
-
-
-@pytest.fixture
-def config():
-    return Config()
-
-
-def test_get_then_set(config):
-    config_key = "datafusion.optimizer.filter_null_join_keys"
-
-    assert config.get(config_key).as_py() is False
-
-    config.set(config_key, True)
-    assert config.get(config_key).as_py() is True
-
-
-def test_get_all(config):
-    config.get_all()
-
+# This script runs all the Rust lints locally the same way the
+# DataFusion CI does
 
-def test_get_invalid_config(config):
-    assert config.get("not.valid.key") is None
+set -e
+source venv/bin/activate
+flake8 --exclude venv --ignore=E501,W503
+black --line-length 79 .
diff --git a/src/catalog.rs b/src/catalog.rs
index d7a6b8a..2929a88 100644
--- a/src/catalog.rs
+++ b/src/catalog.rs
@@ -21,6 +21,8 @@ use std::sync::Arc;
 use pyo3::exceptions::PyKeyError;
 use pyo3::prelude::*;
 
+use crate::errors::DataFusionError;
+use crate::utils::wait_for_future;
 use datafusion::{
     arrow::pyarrow::PyArrowConvert,
     catalog::{catalog::CatalogProvider, schema::SchemaProvider},
@@ -88,13 +90,11 @@ impl PyDatabase {
         self.database.table_names().into_iter().collect()
     }
 
-    fn table(&self, name: &str) -> PyResult<PyTable> {
-        match self.database.table(name) {
-            Some(table) => Ok(PyTable::new(table)),
-            None => Err(PyKeyError::new_err(format!(
-                "Table with name {} doesn't exist.",
-                name
-            ))),
+    fn table(&self, name: &str, py: Python) -> PyResult<PyTable> {
+        if let Some(table) = wait_for_future(py, self.database.table(name)) {
+            Ok(PyTable::new(table))
+        } else {
+            Err(DataFusionError::Common(format!("Table not found: {}", name)).into())
         }
     }
 
diff --git a/src/config.rs b/src/config.rs
index 05a6a0c..d5d5777 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -38,27 +38,37 @@ impl PyConfig {
 
     /// Get configurations from environment variables
     #[staticmethod]
-    pub fn from_env() -> Self {
-        Self {
-            config: ConfigOptions::from_env(),
-        }
+    pub fn from_env() -> PyResult<Self> {
+        Ok(Self {
+            config: ConfigOptions::from_env()?,
+        })
     }
 
     /// Get a configuration option
     pub fn get(&mut self, key: &str, py: Python) -> PyResult<PyObject> {
-        Ok(self.config.get(key).into_py(py))
+        let options = self.config.to_owned();
+        for entry in options.entries() {
+            if entry.key == key {
+                return Ok(entry.value.into_py(py));
+            }
+        }
+        Ok(None::<String>.into_py(py))
     }
 
     /// Set a configuration option
-    pub fn set(&mut self, key: &str, value: PyObject, py: Python) {
-        self.config.set(key, py_obj_to_scalar_value(py, value))
+    pub fn set(&mut self, key: &str, value: PyObject, py: Python) -> PyResult<()> {
+        let scalar_value = py_obj_to_scalar_value(py, value);
+        self.config
+            .set(key, scalar_value.to_string().as_str())
+            .map_err(|e| e.into())
     }
 
     /// Get all configuration options
     pub fn get_all(&mut self, py: Python) -> PyResult<PyObject> {
         let dict = PyDict::new(py);
-        for (key, value) in self.config.options() {
-            dict.set_item(key, value.clone().into_py(py))?;
+        let options = self.config.to_owned();
+        for entry in options.entries() {
+            dict.set_item(entry.key, entry.value.clone().into_py(py))?;
         }
         Ok(dict.into())
     }
diff --git a/src/context.rs b/src/context.rs
index 344d5c7..2ffa3da 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -25,8 +25,6 @@ use uuid::Uuid;
 use pyo3::exceptions::{PyKeyError, PyValueError};
 use pyo3::prelude::*;
 
-use parking_lot::RwLock;
-
 use crate::catalog::{PyCatalog, PyTable};
 use crate::dataframe::PyDataFrame;
 use crate::dataset::Dataset;
@@ -38,11 +36,13 @@ use crate::utils::wait_for_future;
 use datafusion::arrow::datatypes::{DataType, Schema};
 use datafusion::arrow::pyarrow::PyArrowType;
 use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::config::ConfigOptions;
 use datafusion::datasource::datasource::TableProvider;
 use datafusion::datasource::MemTable;
 use datafusion::execution::context::{SessionConfig, SessionContext};
-use datafusion::prelude::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions};
+use datafusion::prelude::{
+    AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions, ParquetReadOptions,
+};
+use datafusion_common::ScalarValue;
 
 /// `PySessionContext` is able to plan and execute DataFusion plans.
 /// It has a powerful optimizer, a physical planner for local execution, and a
@@ -79,41 +79,32 @@ impl PySessionContext {
         parquet_pruning: bool,
         target_partitions: Option<usize>,
         config_options: Option<HashMap<String, String>>,
-    ) -> Self {
-        let mut options = ConfigOptions::from_env();
-        if let Some(hash_map) = config_options {
-            for (k, v) in &hash_map {
-                if let Ok(v) = v.parse::<bool>() {
-                    options.set_bool(k, v);
-                } else if let Ok(v) = v.parse::<u64>() {
-                    options.set_u64(k, v);
-                } else {
-                    options.set_string(k, v);
-                }
-            }
-        }
-        let config_options = Arc::new(RwLock::new(options));
-
+    ) -> PyResult<Self> {
         let mut cfg = SessionConfig::new()
-            .create_default_catalog_and_schema(create_default_catalog_and_schema)
-            .with_default_catalog_and_schema(default_catalog, default_schema)
             .with_information_schema(information_schema)
             .with_repartition_joins(repartition_joins)
             .with_repartition_aggregations(repartition_aggregations)
             .with_repartition_windows(repartition_windows)
             .with_parquet_pruning(parquet_pruning);
 
-        // TODO we should add a `with_config_options` to `SessionConfig`
-        cfg.config_options = config_options;
+        if create_default_catalog_and_schema {
+            cfg = cfg.with_default_catalog_and_schema(default_catalog, default_schema);
+        }
+
+        if let Some(hash_map) = config_options {
+            for (k, v) in &hash_map {
+                cfg = cfg.set(k, ScalarValue::Utf8(Some(v.clone())));
+            }
+        }
 
         let cfg_full = match target_partitions {
             None => cfg,
             Some(x) => cfg.with_target_partitions(x),
         };
 
-        PySessionContext {
+        Ok(PySessionContext {
             ctx: SessionContext::with_config(cfg_full),
-        }
+        })
     }
 
     /// Register a an object store with the given name
@@ -161,6 +152,7 @@ impl PySessionContext {
     fn create_dataframe(
         &mut self,
         partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
+        py: Python,
     ) -> PyResult<PyDataFrame> {
         let schema = partitions.0[0][0].schema();
         let table = MemTable::try_new(schema, partitions.0).map_err(DataFusionError::from)?;
@@ -175,7 +167,8 @@ impl PySessionContext {
         self.ctx
             .register_table(&*name, Arc::new(table))
             .map_err(DataFusionError::from)?;
-        let table = self.ctx.table(&*name).map_err(DataFusionError::from)?;
+
+        let table = wait_for_future(py, self._table(&name)).map_err(DataFusionError::from)?;
 
         let df = PyDataFrame::new(table);
         Ok(df)
@@ -311,8 +304,9 @@ impl PySessionContext {
         self.ctx.tables().unwrap()
     }
 
-    fn table(&self, name: &str) -> PyResult<PyDataFrame> {
-        Ok(PyDataFrame::new(self.ctx.table(name)?))
+    fn table(&self, name: &str, py: Python) -> PyResult<PyDataFrame> {
+        let x = wait_for_future(py, self.ctx.table(name)).map_err(DataFusionError::from)?;
+        Ok(PyDataFrame::new(x))
     }
 
     fn table_exist(&self, name: &str) -> PyResult<bool> {
@@ -348,12 +342,16 @@ impl PySessionContext {
             .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
         let mut options = NdJsonReadOptions::default()
             .table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
-        options.schema = schema.map(|s| Arc::new(s.0));
         options.schema_infer_max_records = schema_infer_max_records;
         options.file_extension = file_extension;
-
-        let result = self.ctx.read_json(path, options);
-        let df = wait_for_future(py, result).map_err(DataFusionError::from)?;
+        let df = if let Some(schema) = schema {
+            options.schema = Some(&schema.0);
+            let result = self.ctx.read_json(path, options);
+            wait_for_future(py, result).map_err(DataFusionError::from)?
+        } else {
+            let result = self.ctx.read_json(path, options);
+            wait_for_future(py, result).map_err(DataFusionError::from)?
+        };
         Ok(PyDataFrame::new(df))
     }
 
@@ -451,11 +449,21 @@ impl PySessionContext {
         let mut options = AvroReadOptions::default()
             .table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
         options.file_extension = file_extension;
-        options.schema = schema.map(|s| Arc::new(s.0));
+        let df = if let Some(schema) = schema {
+            options.schema = Some(&schema.0);
+            let read_future = self.ctx.read_avro(path, options);
+            wait_for_future(py, read_future).map_err(DataFusionError::from)?
+        } else {
+            let read_future = self.ctx.read_avro(path, options);
+            wait_for_future(py, read_future).map_err(DataFusionError::from)?
+        };
+        Ok(PyDataFrame::new(df))
+    }
+}
 
-        let result = self.ctx.read_avro(path, options);
-        let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);
-        Ok(df)
+impl PySessionContext {
+    async fn _table(&self, name: &str) -> datafusion_common::Result<DataFrame> {
+        self.ctx.table(name).await
     }
 }
 
diff --git a/src/dataframe.rs b/src/dataframe.rs
index ab8f2c4..b56cadf 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -38,8 +38,8 @@ pub(crate) struct PyDataFrame {
 
 impl PyDataFrame {
     /// creates a new PyDataFrame
-    pub fn new(df: Arc<DataFrame>) -> Self {
-        Self { df }
+    pub fn new(df: DataFrame) -> Self {
+        Self { df: Arc::new(df) }
     }
 }
 
@@ -71,50 +71,54 @@ impl PyDataFrame {
 
     #[args(args = "*")]
     fn select_columns(&self, args: Vec<&str>) -> PyResult<Self> {
-        let df = self.df.select_columns(&args)?;
+        let df = self.df.as_ref().clone().select_columns(&args)?;
         Ok(Self::new(df))
     }
 
     #[args(args = "*")]
     fn select(&self, args: Vec<PyExpr>) -> PyResult<Self> {
         let expr = args.into_iter().map(|e| e.into()).collect();
-        let df = self.df.select(expr)?;
+        let df = self.df.as_ref().clone().select(expr)?;
         Ok(Self::new(df))
     }
 
     fn filter(&self, predicate: PyExpr) -> PyResult<Self> {
-        let df = self.df.filter(predicate.into())?;
+        let df = self.df.as_ref().clone().filter(predicate.into())?;
         Ok(Self::new(df))
     }
 
     fn with_column(&self, name: &str, expr: PyExpr) -> PyResult<Self> {
-        let df = self.df.with_column(name, expr.into())?;
+        let df = self.df.as_ref().clone().with_column(name, expr.into())?;
         Ok(Self::new(df))
     }
 
     /// Rename one column by applying a new projection. This is a no-op if the column to be
     /// renamed does not exist.
     fn with_column_renamed(&self, old_name: &str, new_name: &str) -> PyResult<Self> {
-        let df = self.df.with_column_renamed(old_name, new_name)?;
+        let df = self
+            .df
+            .as_ref()
+            .clone()
+            .with_column_renamed(old_name, new_name)?;
         Ok(Self::new(df))
     }
 
     fn aggregate(&self, group_by: Vec<PyExpr>, aggs: Vec<PyExpr>) -> PyResult<Self> {
         let group_by = group_by.into_iter().map(|e| e.into()).collect();
         let aggs = aggs.into_iter().map(|e| e.into()).collect();
-        let df = self.df.aggregate(group_by, aggs)?;
+        let df = self.df.as_ref().clone().aggregate(group_by, aggs)?;
         Ok(Self::new(df))
     }
 
     #[args(exprs = "*")]
     fn sort(&self, exprs: Vec<PyExpr>) -> PyResult<Self> {
         let exprs = exprs.into_iter().map(|e| e.into()).collect();
-        let df = self.df.sort(exprs)?;
+        let df = self.df.as_ref().clone().sort(exprs)?;
         Ok(Self::new(df))
     }
 
     fn limit(&self, count: usize) -> PyResult<Self> {
-        let df = self.df.limit(0, Some(count))?;
+        let df = self.df.as_ref().clone().limit(0, Some(count))?;
         Ok(Self::new(df))
     }
 
@@ -122,7 +126,7 @@ impl PyDataFrame {
     /// Unless some order is specified in the plan, there is no
     /// guarantee of the order of the result.
     fn collect(&self, py: Python) -> PyResult<Vec<PyObject>> {
-        let batches = wait_for_future(py, self.df.collect())?;
+        let batches = wait_for_future(py, self.df.as_ref().clone().collect())?;
         // cannot use PyResult<Vec<RecordBatch>> return type due to
         // https://github.com/PyO3/pyo3/issues/1813
         batches.into_iter().map(|rb| rb.to_pyarrow(py)).collect()
@@ -130,14 +134,14 @@ impl PyDataFrame {
 
     /// Cache DataFrame.
     fn cache(&self, py: Python) -> PyResult<Self> {
-        let df = wait_for_future(py, self.df.cache())?;
+        let df = wait_for_future(py, self.df.as_ref().clone().cache())?;
         Ok(Self::new(df))
     }
 
     /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
     /// maintaining the input partitioning.
     fn collect_partitioned(&self, py: Python) -> PyResult<Vec<Vec<PyObject>>> {
-        let batches = wait_for_future(py, self.df.collect_partitioned())?;
+        let batches = wait_for_future(py, self.df.as_ref().clone().collect_partitioned())?;
 
         batches
             .into_iter()
@@ -148,14 +152,14 @@ impl PyDataFrame {
     /// Print the result, 20 lines by default
     #[args(num = "20")]
     fn show(&self, py: Python, num: usize) -> PyResult<()> {
-        let df = self.df.limit(0, Some(num))?;
+        let df = self.df.as_ref().clone().limit(0, Some(num))?;
         let batches = wait_for_future(py, df.collect())?;
         pretty::print_batches(&batches).map_err(|err| PyArrowException::new_err(err.to_string()))
     }
 
     /// Filter out duplicate rows
     fn distinct(&self) -> PyResult<Self> {
-        let df = self.df.distinct()?;
+        let df = self.df.as_ref().clone().distinct()?;
         Ok(Self::new(df))
     }
 
@@ -181,23 +185,31 @@ impl PyDataFrame {
             }
         };
 
-        let df = self
-            .df
-            .join(right.df, join_type, &join_keys.0, &join_keys.1, None)?;
+        let df = self.df.as_ref().clone().join(
+            right.df.as_ref().clone(),
+            join_type,
+            &join_keys.0,
+            &join_keys.1,
+            None,
+        )?;
         Ok(Self::new(df))
     }
 
     /// Print the query plan
     #[args(verbose = false, analyze = false)]
     fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyResult<()> {
-        let df = self.df.explain(verbose, analyze)?;
+        let df = self.df.as_ref().clone().explain(verbose, analyze)?;
         let batches = wait_for_future(py, df.collect())?;
         pretty::print_batches(&batches).map_err(|err| PyArrowException::new_err(err.to_string()))
     }
 
     /// Repartition a `DataFrame` based on a logical partitioning scheme.
     fn repartition(&self, num: usize) -> PyResult<Self> {
-        let new_df = self.df.repartition(Partitioning::RoundRobinBatch(num))?;
+        let new_df = self
+            .df
+            .as_ref()
+            .clone()
+            .repartition(Partitioning::RoundRobinBatch(num))?;
         Ok(Self::new(new_df))
     }
 
@@ -205,7 +217,11 @@ impl PyDataFrame {
     #[args(args = "*", num)]
     fn repartition_by_hash(&self, args: Vec<PyExpr>, num: usize) -> PyResult<Self> {
         let expr = args.into_iter().map(|py_expr| py_expr.into()).collect();
-        let new_df = self.df.repartition(Partitioning::Hash(expr, num))?;
+        let new_df = self
+            .df
+            .as_ref()
+            .clone()
+            .repartition(Partitioning::Hash(expr, num))?;
         Ok(Self::new(new_df))
     }
 
@@ -214,9 +230,12 @@ impl PyDataFrame {
     #[args(distinct = false)]
     fn union(&self, py_df: PyDataFrame, distinct: bool) -> PyResult<Self> {
         let new_df = if distinct {
-            self.df.union_distinct(py_df.df)?
+            self.df
+                .as_ref()
+                .clone()
+                .union_distinct(py_df.df.as_ref().clone())?
         } else {
-            self.df.union(py_df.df)?
+            self.df.as_ref().clone().union(py_df.df.as_ref().clone())?
         };
 
         Ok(Self::new(new_df))
@@ -225,37 +244,45 @@ impl PyDataFrame {
     /// Calculate the distinct union of two `DataFrame`s.  The
     /// two `DataFrame`s must have exactly the same schema
     fn union_distinct(&self, py_df: PyDataFrame) -> PyResult<Self> {
-        let new_df = self.df.union_distinct(py_df.df)?;
+        let new_df = self
+            .df
+            .as_ref()
+            .clone()
+            .union_distinct(py_df.df.as_ref().clone())?;
         Ok(Self::new(new_df))
     }
 
     /// Calculate the intersection of two `DataFrame`s.  The two `DataFrame`s must have exactly the same schema
     fn intersect(&self, py_df: PyDataFrame) -> PyResult<Self> {
-        let new_df = self.df.intersect(py_df.df)?;
+        let new_df = self
+            .df
+            .as_ref()
+            .clone()
+            .intersect(py_df.df.as_ref().clone())?;
         Ok(Self::new(new_df))
     }
 
     /// Calculate the exception of two `DataFrame`s.  The two `DataFrame`s must have exactly the same schema
     fn except_all(&self, py_df: PyDataFrame) -> PyResult<Self> {
-        let new_df = self.df.except(py_df.df)?;
+        let new_df = self.df.as_ref().clone().except(py_df.df.as_ref().clone())?;
         Ok(Self::new(new_df))
     }
 
     /// Write a `DataFrame` to a CSV file.
     fn write_csv(&self, path: &str, py: Python) -> PyResult<()> {
-        wait_for_future(py, self.df.write_csv(path))?;
+        wait_for_future(py, self.df.as_ref().clone().write_csv(path))?;
         Ok(())
     }
 
     /// Write a `DataFrame` to a Parquet file.
     fn write_parquet(&self, path: &str, py: Python) -> PyResult<()> {
-        wait_for_future(py, self.df.write_parquet(path, None))?;
+        wait_for_future(py, self.df.as_ref().clone().write_parquet(path, None))?;
         Ok(())
     }
 
     /// Executes a query and writes the results to a partitioned JSON file.
     fn write_json(&self, path: &str, py: Python) -> PyResult<()> {
-        wait_for_future(py, self.df.write_json(path))?;
+        wait_for_future(py, self.df.as_ref().clone().write_json(path))?;
         Ok(())
     }
 }
diff --git a/src/dataset_exec.rs b/src/dataset_exec.rs
index 9c41218..be6dc1e 100644
--- a/src/dataset_exec.rs
+++ b/src/dataset_exec.rs
@@ -162,10 +162,6 @@ impl ExecutionPlan for DatasetExec {
         })
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         None
     }
diff --git a/src/functions.rs b/src/functions.rs
index 5a0a20d..d40ffbe 100644
--- a/src/functions.rs
+++ b/src/functions.rs
@@ -17,10 +17,12 @@
 
 use pyo3::{prelude::*, wrap_pyfunction};
 
-use datafusion::physical_plan::aggregates::AggregateFunction;
-use datafusion_expr::{lit, BuiltinScalarFunction};
+use datafusion_expr::expr::AggregateFunction;
+use datafusion_expr::expr::{Sort, WindowFunction};
+use datafusion_expr::window_function::find_df_window_func;
+use datafusion_expr::{lit, BuiltinScalarFunction, WindowFrame};
 
-use crate::errors;
+use crate::errors::DataFusionError;
 use crate::expression::PyExpr;
 
 #[pyfunction]
@@ -63,11 +65,11 @@ fn concat_ws(sep: String, args: Vec<PyExpr>) -> PyResult<PyExpr> {
 #[pyfunction]
 fn order_by(expr: PyExpr, asc: Option<bool>, nulls_first: Option<bool>) -> PyResult<PyExpr> {
     Ok(PyExpr {
-        expr: datafusion_expr::Expr::Sort {
+        expr: datafusion_expr::Expr::Sort(Sort {
             expr: Box::new(expr.expr),
             asc: asc.unwrap_or(true),
             nulls_first: nulls_first.unwrap_or(true),
-        },
+        }),
     })
 }
 
@@ -87,11 +89,14 @@ fn window(
     partition_by: Option<Vec<PyExpr>>,
     order_by: Option<Vec<PyExpr>>,
 ) -> PyResult<PyExpr> {
-    use std::str::FromStr;
-    let fun = datafusion_expr::window_function::WindowFunction::from_str(name)
-        .map_err(|e| -> errors::DataFusionError { e.into() })?;
+    let fun = find_df_window_func(name);
+    if fun.is_none() {
+        return Err(DataFusionError::Common("window function not found".to_string()).into());
+    }
+    let fun = fun.unwrap();
+    let window_frame = WindowFrame::new(order_by.is_some());
     Ok(PyExpr {
-        expr: datafusion_expr::Expr::WindowFunction {
+        expr: datafusion_expr::Expr::WindowFunction(WindowFunction {
             fun,
             args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
             partition_by: partition_by
@@ -104,8 +109,8 @@ fn window(
                 .into_iter()
                 .map(|x| x.expr)
                 .collect::<Vec<_>>(),
-            window_frame: None,
-        },
+            window_frame,
+        }),
     })
 }
 
@@ -135,12 +140,12 @@ macro_rules! aggregate_function {
         #[doc = $DOC]
         #[pyfunction(args = "*", distinct = "false")]
         fn $NAME(args: Vec<PyExpr>, distinct: bool) -> PyExpr {
-            let expr = datafusion_expr::Expr::AggregateFunction {
-                fun: AggregateFunction::$FUNC,
+            let expr = datafusion_expr::Expr::AggregateFunction(AggregateFunction {
+                fun: datafusion_expr::aggregate_function::AggregateFunction::$FUNC,
                 args: args.into_iter().map(|e| e.into()).collect(),
                 distinct,
                 filter: None,
-            };
+            });
             expr.into()
         }
     };
diff --git a/src/udaf.rs b/src/udaf.rs
index 66dc274..9d96e2c 100644
--- a/src/udaf.rs
+++ b/src/udaf.rs
@@ -24,9 +24,7 @@ use datafusion::arrow::datatypes::DataType;
 use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType};
 use datafusion::common::ScalarValue;
 use datafusion::error::{DataFusionError, Result};
-use datafusion_expr::{
-    create_udaf, Accumulator, AccumulatorFunctionImplementation, AggregateState, AggregateUDF,
-};
+use datafusion_expr::{create_udaf, Accumulator, AccumulatorFunctionImplementation, AggregateUDF};
 
 use crate::expression::PyExpr;
 use crate::utils::parse_volatility;
@@ -43,13 +41,9 @@ impl RustAccumulator {
 }
 
 impl Accumulator for RustAccumulator {
-    fn state(&self) -> Result<Vec<AggregateState>> {
-        let py_result: PyResult<Vec<ScalarValue>> =
-            Python::with_gil(|py| self.accum.as_ref(py).call_method0("state")?.extract());
-        match py_result {
-            Ok(r) => Ok(r.into_iter().map(AggregateState::Scalar).collect()),
-            Err(e) => Err(DataFusionError::Execution(format!("{}", e))),
-        }
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Python::with_gil(|py| self.accum.as_ref(py).call_method0("state")?.extract())
+            .map_err(|e| DataFusionError::Execution(format!("{}", e)))
     }
 
     fn evaluate(&self) -> Result<ScalarValue> {