You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/08 12:02:36 UTC

[arrow-ballista] branch master updated: Upgrade to DataFusion 13.0.0-rc1 (#325)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 28b9f41b Upgrade to DataFusion 13.0.0-rc1 (#325)
28b9f41b is described below

commit 28b9f41b600cd3888e2522adb592676b17b15e1a
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sat Oct 8 06:02:32 2022 -0600

    Upgrade to DataFusion 13.0.0-rc1 (#325)
---
 ballista-cli/Cargo.toml            |  4 ++--
 ballista/rust/client/Cargo.toml    |  6 +++---
 ballista/rust/core/Cargo.toml      |  8 ++++----
 ballista/rust/executor/Cargo.toml  |  8 ++++----
 ballista/rust/scheduler/Cargo.toml |  6 +++---
 benchmarks/Cargo.toml              |  4 ++--
 examples/Cargo.toml                |  2 +-
 python/Cargo.toml                  |  2 +-
 python/src/ballista_context.rs     |  5 +++--
 python/src/context.rs              | 15 ++++++++-------
 python/src/dataframe.rs            | 12 +++++++-----
 python/src/dataset.rs              | 10 +++++++++-
 python/src/dataset_exec.rs         | 12 +++++++++---
 python/src/expression.rs           |  6 +++---
 python/src/udaf.rs                 | 15 ++++++++-------
 python/src/udf.rs                  | 14 +++++++-------
 16 files changed, 74 insertions(+), 55 deletions(-)

diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 9f28ee9a..ae6dea44 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -33,8 +33,8 @@ ballista = { path = "../ballista/rust/client", version = "0.8.0", features = [
     "standalone",
 ] }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index a7ab2164..f08a4b7d 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,12 +31,12 @@ rust-version = "1.59"
 ballista-core = { path = "../core", version = "0.8.0" }
 ballista-executor = { path = "../executor", version = "0.8.0", optional = true }
 ballista-scheduler = { path = "../scheduler", version = "0.8.0", optional = true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
-sqlparser = "0.23"
+sqlparser = "0.25"
 tempfile = "3"
 tokio = "1.0"
 
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 6699af01..d834ce10 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -41,13 +41,13 @@ simd = ["datafusion/simd"]
 [dependencies]
 ahash = { version = "0.8", default-features = false }
 
-arrow-flight = { version = "23.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
 datafusion-objectstore-hdfs = { version = "0.1.0", optional = true }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
 futures = "0.3"
 hashbrown = "0.12"
 
@@ -63,7 +63,7 @@ prost = "0.11"
 prost-types = "0.11"
 rand = "0.8"
 serde = { version = "1", features = ["derive"] }
-sqlparser = "0.23"
+sqlparser = "0.25"
 tokio = "1.0"
 tokio-stream = { version = "0.1", features = ["net"] }
 tonic = "0.8"
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index ca3532ac..9eda5278 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -34,15 +34,15 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 anyhow = "1"
-arrow = { version = "23.0.0" }
-arrow-flight = { version = "23.0.0" }
+arrow = { version = "24.0.0" }
+arrow-flight = { version = "24.0.0" }
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.8.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
 futures = "0.3"
 hyper = "0.14.4"
 log = "0.4"
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 90e3f020..ccf5bd6f 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -38,7 +38,7 @@ sled = ["sled_package", "tokio-stream"]
 
 [dependencies]
 anyhow = "1"
-arrow-flight = { version = "23.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] }
 async-recursion = "1.0.0"
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.8.0" }
@@ -46,8 +46,8 @@ base64 = { version = "0.13", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
 etcd-client = { version = "0.9", optional = true }
 flatbuffers = { version = "2.1.2" }
 futures = "0.3"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 167dbee5..553e94ee 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.8.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 7170365e..6a1967a3 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.8.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.11"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index c5d2e8e6..829be3be 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ default = ["mimalloc"]
 [dependencies]
 async-trait = "0.1"
 ballista = { path = "../ballista/rust/client", version = "0.8.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee", features = ["pyarrow"] }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1", features = ["pyarrow"] }
 futures = "0.3"
 mimalloc = { version = "*", optional = true, default-features = false }
 pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] }
diff --git a/python/src/ballista_context.rs b/python/src/ballista_context.rs
index 2d92e970..d059ba72 100644
--- a/python/src/ballista_context.rs
+++ b/python/src/ballista_context.rs
@@ -25,6 +25,7 @@ use crate::dataframe::PyDataFrame;
 use crate::errors::BallistaError;
 use ballista::prelude::{BallistaConfig, BallistaContext};
 use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::pyarrow::PyArrowType;
 use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions};
 
 /// `PyBallistaContext` is able to plan and execute DataFusion plans.
@@ -83,7 +84,7 @@ impl PyBallistaContext {
         &mut self,
         name: &str,
         path: PathBuf,
-        schema: Option<Schema>,
+        schema: Option<PyArrowType<Schema>>,
         has_header: bool,
         delimiter: &str,
         schema_infer_max_records: usize,
@@ -108,7 +109,7 @@ impl PyBallistaContext {
             .delimiter(delimiter[0])
             .schema_infer_max_records(schema_infer_max_records)
             .file_extension(file_extension);
-        options.schema = schema.as_ref();
+        options.schema = schema.as_ref().map(|x| &x.0);
 
         let result = ctx.register_csv(name, path, options);
         wait_for_future(py, result).map_err(BallistaError::from)?;
diff --git a/python/src/context.rs b/python/src/context.rs
index 8dfa9bef..38a3cb9b 100644
--- a/python/src/context.rs
+++ b/python/src/context.rs
@@ -24,6 +24,7 @@ use pyo3::exceptions::{PyKeyError, PyValueError};
 use pyo3::prelude::*;
 
 use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::pyarrow::PyArrowType;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::datasource::datasource::TableProvider;
 use datafusion::datasource::MemTable;
@@ -100,9 +101,9 @@ impl PySessionContext {
 
     fn create_dataframe(
         &mut self,
-        partitions: Vec<Vec<RecordBatch>>,
+        partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
     ) -> PyResult<PyDataFrame> {
-        let table = MemTable::try_new(partitions[0][0].schema(), partitions)
+        let table = MemTable::try_new(partitions.0[0][0].schema(), partitions.0)
             .map_err(DataFusionError::from)?;
 
         // generate a random (unique) name for this table
@@ -138,10 +139,10 @@ impl PySessionContext {
     fn register_record_batches(
         &mut self,
         name: &str,
-        partitions: Vec<Vec<RecordBatch>>,
+        partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
     ) -> PyResult<()> {
-        let schema = partitions[0][0].schema();
-        let table = MemTable::try_new(schema, partitions)?;
+        let schema = partitions.0[0][0].schema();
+        let table = MemTable::try_new(schema, partitions.0)?;
         self.ctx
             .register_table(name, Arc::new(table))
             .map_err(DataFusionError::from)?;
@@ -184,7 +185,7 @@ impl PySessionContext {
         &mut self,
         name: &str,
         path: PathBuf,
-        schema: Option<Schema>,
+        schema: Option<PyArrowType<Schema>>,
         has_header: bool,
         delimiter: &str,
         schema_infer_max_records: usize,
@@ -206,7 +207,7 @@ impl PySessionContext {
             .delimiter(delimiter[0])
             .schema_infer_max_records(schema_infer_max_records)
             .file_extension(file_extension);
-        options.schema = schema.as_ref();
+        options.schema = schema.as_ref().map(|x| &x.0);
 
         let result = self.ctx.register_csv(name, path, options);
         wait_for_future(py, result).map_err(DataFusionError::from)?;
diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs
index af1bcf0c..bc1a9a68 100644
--- a/python/src/dataframe.rs
+++ b/python/src/dataframe.rs
@@ -18,7 +18,7 @@
 use crate::utils::wait_for_future;
 use crate::{errors::DataFusionError, expression::PyExpr};
 use datafusion::arrow::datatypes::Schema;
-use datafusion::arrow::pyarrow::PyArrowConvert;
+use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowException, PyArrowType};
 use datafusion::arrow::util::pretty;
 use datafusion::dataframe::DataFrame;
 use datafusion::logical_plan::JoinType;
@@ -65,8 +65,8 @@ impl PyDataFrame {
     }
 
     /// Returns the schema from the logical plan
-    fn schema(&self) -> Schema {
-        self.df.schema().into()
+    fn schema(&self) -> PyArrowType<Schema> {
+        PyArrowType(self.df.schema().into())
     }
 
     #[args(args = "*")]
@@ -126,7 +126,8 @@ impl PyDataFrame {
     fn show(&self, py: Python, num: usize) -> PyResult<()> {
         let df = self.df.limit(0, Some(num))?;
         let batches = wait_for_future(py, df.collect())?;
-        Ok(pretty::print_batches(&batches)?)
+        pretty::print_batches(&batches)
+            .map_err(|err| PyArrowException::new_err(err.to_string()))
     }
 
     fn join(
@@ -162,6 +163,7 @@ impl PyDataFrame {
     fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyResult<()> {
         let df = self.df.explain(verbose, analyze)?;
         let batches = wait_for_future(py, df.collect())?;
-        Ok(pretty::print_batches(&batches)?)
+        pretty::print_batches(&batches)
+            .map_err(|err| PyArrowException::new_err(err.to_string()))
     }
 }
diff --git a/python/src/dataset.rs b/python/src/dataset.rs
index a678b904..d34d974f 100644
--- a/python/src/dataset.rs
+++ b/python/src/dataset.rs
@@ -27,6 +27,7 @@ use std::sync::Arc;
 use async_trait::async_trait;
 
 use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::pyarrow::PyArrowType;
 use datafusion::datasource::datasource::TableProviderFilterPushDown;
 use datafusion::datasource::{TableProvider, TableType};
 use datafusion::error::{DataFusionError, Result as DFResult};
@@ -74,7 +75,14 @@ impl TableProvider for Dataset {
         Python::with_gil(|py| {
             let dataset = self.dataset.as_ref(py);
             // This can panic but since we checked that self.dataset is a pyarrow.dataset.Dataset it should never
-            Arc::new(dataset.getattr("schema").unwrap().extract().unwrap())
+            Arc::new(
+                dataset
+                    .getattr("schema")
+                    .unwrap()
+                    .extract::<PyArrowType<_>>()
+                    .unwrap()
+                    .0,
+            )
         })
     }
 
diff --git a/python/src/dataset_exec.rs b/python/src/dataset_exec.rs
index f57a3a76..987b7ad1 100644
--- a/python/src/dataset_exec.rs
+++ b/python/src/dataset_exec.rs
@@ -28,6 +28,7 @@ use futures::stream;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::arrow::error::ArrowError;
 use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::pyarrow::PyArrowType;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFResult};
 use datafusion::execution::context::TaskContext;
@@ -54,7 +55,7 @@ impl Iterator for PyArrowBatchesAdapter {
             Some(
                 batches
                     .next()?
-                    .and_then(|batch| batch.extract())
+                    .and_then(|batch| Ok(batch.extract::<PyArrowType<_>>()?.0))
                     .map_err(|err| ArrowError::ExternalError(Box::new(err))),
             )
         })
@@ -109,7 +110,12 @@ impl DatasetExec {
 
         let scanner = dataset.call_method("scanner", (), Some(kwargs))?;
 
-        let schema = Arc::new(scanner.getattr("projected_schema")?.extract()?);
+        let schema = Arc::new(
+            scanner
+                .getattr("projected_schema")?
+                .extract::<PyArrowType<_>>()?
+                .0,
+        );
 
         let builtins = Python::import(py, "builtins")?;
         let pylist = builtins.getattr("list")?;
@@ -211,7 +217,7 @@ impl ExecutionPlan for DatasetExec {
             let schema: SchemaRef = Arc::new(
                 scanner
                     .getattr("projected_schema")
-                    .and_then(|schema| schema.extract())
+                    .and_then(|schema| Ok(schema.extract::<PyArrowType<_>>()?.0))
                     .map_err(|err| InnerDataFusionError::External(Box::new(err)))?,
             );
             let record_batches: &PyIterator = scanner
diff --git a/python/src/expression.rs b/python/src/expression.rs
index b3275ccf..aa6e540c 100644
--- a/python/src/expression.rs
+++ b/python/src/expression.rs
@@ -19,8 +19,8 @@ use pyo3::{basic::CompareOp, prelude::*};
 use std::convert::{From, Into};
 
 use datafusion::arrow::datatypes::DataType;
+use datafusion::arrow::pyarrow::PyArrowType;
 use datafusion::logical_plan::{col, lit, Expr};
-
 use datafusion::scalar::ScalarValue;
 
 /// An PyExpr that can be used on a DataFrame
@@ -125,12 +125,12 @@ impl PyExpr {
         self.expr.clone().is_null().into()
     }
 
-    pub fn cast(&self, to: DataType) -> PyExpr {
+    pub fn cast(&self, to: PyArrowType<DataType>) -> PyExpr {
         // self.expr.cast_to() requires DFSchema to validate that the cast
         // is supported, omit that for now
         let expr = Expr::Cast {
             expr: Box::new(self.expr.clone()),
-            data_type: to,
+            data_type: to.0,
         };
         expr.into()
     }
diff --git a/python/src/udaf.rs b/python/src/udaf.rs
index 3b93048b..f2973476 100644
--- a/python/src/udaf.rs
+++ b/python/src/udaf.rs
@@ -21,7 +21,7 @@ use pyo3::{prelude::*, types::PyTuple};
 
 use datafusion::arrow::array::ArrayRef;
 use datafusion::arrow::datatypes::DataType;
-use datafusion::arrow::pyarrow::PyArrowConvert;
+use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType};
 use datafusion::common::ScalarValue;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_expr::{
@@ -82,6 +82,7 @@ impl Accumulator for RustAccumulator {
 
             // 1. cast states to Pyarrow array
             let state = state
+                .data()
                 .to_pyarrow(py)
                 .map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
 
@@ -120,18 +121,18 @@ impl PyAggregateUDF {
     fn new(
         name: &str,
         accumulator: PyObject,
-        input_type: DataType,
-        return_type: DataType,
-        state_type: Vec<DataType>,
+        input_type: PyArrowType<DataType>,
+        return_type: PyArrowType<DataType>,
+        state_type: PyArrowType<Vec<DataType>>,
         volatility: &str,
     ) -> PyResult<Self> {
         let function = logical_expr::create_udaf(
             name,
-            input_type,
-            Arc::new(return_type),
+            input_type.0,
+            Arc::new(return_type.0),
             parse_volatility(volatility)?,
             to_rust_accumulator(accumulator),
-            Arc::new(state_type),
+            Arc::new(state_type.0),
         );
         Ok(Self { function })
     }
diff --git a/python/src/udf.rs b/python/src/udf.rs
index 8cf8c9cf..4f188f7e 100644
--- a/python/src/udf.rs
+++ b/python/src/udf.rs
@@ -19,9 +19,9 @@ use std::sync::Arc;
 
 use pyo3::{prelude::*, types::PyTuple};
 
-use datafusion::arrow::array::ArrayRef;
+use datafusion::arrow::array::{make_array, ArrayData, ArrayRef};
 use datafusion::arrow::datatypes::DataType;
-use datafusion::arrow::pyarrow::PyArrowConvert;
+use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType};
 use datafusion::error::DataFusionError;
 use datafusion::logical_expr::{self, function::ScalarFunctionImplementation};
 use datafusion::physical_plan::functions::make_scalar_function;
@@ -52,7 +52,7 @@ fn to_rust_function(func: PyObject) -> ScalarFunctionImplementation {
                 }?;
 
                 // 3. cast to arrow::array::Array
-                let array = ArrayRef::from_pyarrow(value).unwrap();
+                let array = make_array(ArrayData::from_pyarrow(value).unwrap());
                 Ok(array)
             })
         },
@@ -72,14 +72,14 @@ impl PyScalarUDF {
     fn new(
         name: &str,
         func: PyObject,
-        input_types: Vec<DataType>,
-        return_type: DataType,
+        input_types: PyArrowType<Vec<DataType>>,
+        return_type: PyArrowType<DataType>,
         volatility: &str,
     ) -> PyResult<Self> {
         let function = logical_expr::create_udf(
             name,
-            input_types,
-            Arc::new(return_type),
+            input_types.0,
+            Arc::new(return_type.0),
             parse_volatility(volatility)?,
             to_rust_function(func),
         );