You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/27 11:49:50 UTC

[GitHub] [arrow-datafusion] gandronchik opened a new pull request, #2177: udtf support

gandronchik opened a new pull request, #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177

   UDTF support (User-defined functions returning table)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1113608594

   @gandronchik  thank you for the explanation in this PR's description. It helps though I will admit I still don't fully understand what is going o. 
   
   I agree with @doki23  -- I expect a table function to logically return a table (that something with both rows and columns)
   
   > Regarding signature, I decided to use a single vector and vector with sizes of sections instead of vec of vecs to have better performance. If we use Vec, this will require a lot of memory in case of a request for millions of rows.
   
   The way the rest of DataFusion avoids buffering all the intermediate results at once int memory is with `Stream`s but then that requires interacting with rust's `async` ecosystem which is non trivial
   
   If you wanted a streaming solution,  that would mean the signature might look something like the following (maybe)
   
   
   ```rust
   Arc<dyn Fn(Box<dyn SendableRecordBatchStream>) -> Result<Box<dyn SendableRecordBatchStream>> + Send + Sync>;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r849778359


##########
datafusion/core/src/physical_plan/udtf.rs:
##########
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! UDTF support

Review Comment:
   ```suggestion
   //! User Defined Table Function (UDTF) support
   ```



##########
datafusion/physical-expr/src/functions.rs:
##########
@@ -146,3 +146,107 @@ impl PhysicalExpr for ScalarFunctionExpr {
         (fun)(&inputs)
     }
 }
+
+pub struct TableFunctionExpr {
+    fun: TableFunctionImplementation,
+    name: String,
+    args: Vec<Arc<dyn PhysicalExpr>>,
+    return_type: DataType,
+}
+
+impl Debug for TableFunctionExpr {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        f.debug_struct("TableFunctionExpr")
+            .field("fun", &"<FUNC>")
+            .field("name", &self.name)
+            .field("args", &self.args)
+            .field("return_type", &self.return_type)
+            .finish()
+    }
+}
+
+impl TableFunctionExpr {
+    /// Create a new Table function
+    pub fn new(
+        name: &str,
+        fun: TableFunctionImplementation,
+        args: Vec<Arc<dyn PhysicalExpr>>,
+        return_type: &DataType,
+    ) -> Self {
+        Self {
+            fun,
+            name: name.to_owned(),
+            args,
+            return_type: return_type.clone(),
+        }
+    }
+
+    /// Get the table function implementation
+    pub fn fun(&self) -> &TableFunctionImplementation {
+        &self.fun
+    }
+
+    /// The name for this expression
+    pub fn name(&self) -> &str {
+        &self.name
+    }
+
+    /// Input arguments
+    pub fn args(&self) -> &[Arc<dyn PhysicalExpr>] {
+        &self.args
+    }
+
+    /// Data type produced by this expression
+    pub fn return_type(&self) -> &DataType {
+        &self.return_type
+    }
+}
+
+impl fmt::Display for TableFunctionExpr {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(
+            f,
+            "{}({})",
+            self.name,
+            self.args
+                .iter()
+                .map(|e| format!("{}", e))
+                .collect::<Vec<String>>()
+                .join(", ")
+        )
+    }
+}
+
+impl PhysicalExpr for TableFunctionExpr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(self.return_type.clone())
+    }
+
+    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+        Ok(true)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        // evaluate the arguments, if there are no arguments we'll instead pass in a null array
+        // indicating the batch size (as a convention)
+        let inputs = match (self.args.len(), self.name.parse::<BuiltinScalarFunction>()) {

Review Comment:
   I don't understand why we are parsing the table function name using `BuiltinScalarFunction`?  Don't we already have `self.fun`?



##########
datafusion/expr/src/udtf.rs:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Udtf module contains foundational types that are used to represent UDTFs in DataFusion.

Review Comment:
   ```suggestion
   //! Contains foundational types that are used to represent User Defined Table Functions (UDTFs) in DataFusion.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] gandronchik closed pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
gandronchik closed pull request #2177: udtf support
URL: https://github.com/apache/arrow-datafusion/pull/2177


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] gandronchik commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
gandronchik commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1118507143

   > @gandronchik thank you for the explanation in this PR's description. It helps though I will admit I still don't fully understand what is going o.
   > 
   > I agree with @doki23 -- I expect a table function to logically return a table (that something with both rows and columns)
   > 
   > > Regarding signature, I decided to use a single vector and vector with sizes of sections instead of vec of vecs to have better performance. If we use Vec, this will require a lot of memory in case of a request for millions of rows.
   > 
   > The way the rest of DataFusion avoids buffering all the intermediate results at once int memory is with `Stream`s but then that requires interacting with rust's `async` ecosystem which is non trivial
   > 
   > If you wanted a streaming solution, that would mean the signature might look something like the following (maybe)
   > 
   > ```rust
   > Arc<dyn Fn(Box<dyn SendableRecordBatchStream>) -> Result<Box<dyn SendableRecordBatchStream>> + Send + Sync>;
   > ```
   
   Looks like I got the title wrong. I have implemented a function that returns many rows, probably it is not a table function. If I rename it, will it be fine?
   
   Regarding the function signature, I think my solution is a compromise between vec<vec> and streaming. Actually, I don't think that function can return so many rows. However, of course, I will rewrite it if you want. So which solution do we choose: current `Result<(ArrayRef, Vec<usize>)> + Send + Sync>`, `Result<Vec<ColumnarValue>> + Send + Sync>` or `Result<Box< dyn SendableRecordBatchStream>> + Send + Sync>` ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] gandronchik commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
gandronchik commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861673710


##########
datafusion/core/src/execution/context.rs:
##########
@@ -2115,6 +2141,195 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn user_defined_table_function() -> Result<()> {
+        let mut ctx = SessionContext::new();
+
+        let integer_series = integer_udtf();
+        ctx.register_udtf(create_udtf(
+            "integer_series",
+            vec![DataType::Int64, DataType::Int64],
+            Arc::new(DataType::Int64),
+            Volatility::Immutable,
+            integer_series,
+        ));
+
+        let struct_func = struct_udtf();
+        ctx.register_udtf(create_udtf(
+            "struct_func",
+            vec![DataType::Int64],
+            Arc::new(DataType::Struct(
+                [
+                    Field::new("f1", DataType::Utf8, false),
+                    Field::new("f2", DataType::Int64, false),
+                ]
+                .to_vec(),
+            )),
+            Volatility::Immutable,
+            struct_func,
+        ));
+
+        let result = plan_and_collect(&ctx, "SELECT struct_func(5)").await?;
+
+        let expected = vec![
+            "+-------------------------+",
+            "| struct_func(Int64(5))   |",
+            "+-------------------------+",
+            "| {\"f1\": \"test\", \"f2\": 5} |",
+            "+-------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(6,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(6),Int64(5)) |",
+            "+-----------------------------------+",
+            "+-----------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(1,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(1),Int64(5)) |",
+            "+-----------------------------------+",
+            "| 1                                 |",
+            "| 2                                 |",
+            "| 3                                 |",
+            "| 4                                 |",
+            "| 5                                 |",
+            "+-----------------------------------+",
+        ];

Review Comment:
   I didn't support it. You can use structures for that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] doki23 commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
doki23 commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1113175672

   > I don't think it should return multiply columns, structures are usually used for this.
   
   I cannot agree. Result of *Table Function* represents a temporary table. Since it's a table, it shouldn't only have one column.  Of course, one column of type structure can solve the problem, but it's different. We cannot directly use `order by` or other sql statements on it if we don't extract the structure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1169115997

   > @alamb Hello! Have you had already time to check the PR?
   
   Hi @gandronchik  sadly I have not had a chance. I apologize for my lack of bandwidth but it is hard to find sufficient contiguous time to review such large PRs when I don't have the background context.
   
   My core problem is that I don't understand (despite your admirable attempts to clarify) what this PR is trying to implement, so it is very hard to evaluate the code to see if it is implementing what is desired (because I don't understand what is desired).
   
   For example, all the examples of "set returning functions" in the links you shared in postgres appear to use those functions as elements in the `FROM` clause. For example, 
   
   ```sql
   select * from unnest(ARRAY[1,2], ARRAY['foo','bar','baz']) as x(a,b) →
   ```
   
   So I am struggling to understand examples you share in the PR's description that show using these functions in combination with a column 🤔 
   
   ```sql
   select table_fun(1, col) from (select 2 col union all select 3 col) t;
   ```
   
   So what would you think about implementing more general user defined table functions (that can return RecordBatches / streams as we have discussed above)? I think others would also likely use such functionality and it seems like it would satisfy  the usecases from cube.js (?)
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] gandronchik commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
gandronchik commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1166569358

   > Thanks @gandronchik -- I will try and find time to re-review this PR over the next few days in light of the information above.
   
   @alamb Hello! Have you had already time to check the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861245271


##########
datafusion/core/src/execution/context.rs:
##########
@@ -2115,6 +2141,195 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn user_defined_table_function() -> Result<()> {
+        let mut ctx = SessionContext::new();
+
+        let integer_series = integer_udtf();
+        ctx.register_udtf(create_udtf(
+            "integer_series",
+            vec![DataType::Int64, DataType::Int64],
+            Arc::new(DataType::Int64),
+            Volatility::Immutable,
+            integer_series,
+        ));
+
+        let struct_func = struct_udtf();
+        ctx.register_udtf(create_udtf(
+            "struct_func",
+            vec![DataType::Int64],
+            Arc::new(DataType::Struct(
+                [
+                    Field::new("f1", DataType::Utf8, false),
+                    Field::new("f2", DataType::Int64, false),
+                ]
+                .to_vec(),
+            )),
+            Volatility::Immutable,
+            struct_func,
+        ));
+
+        let result = plan_and_collect(&ctx, "SELECT struct_func(5)").await?;
+
+        let expected = vec![
+            "+-------------------------+",
+            "| struct_func(Int64(5))   |",
+            "+-------------------------+",
+            "| {\"f1\": \"test\", \"f2\": 5} |",
+            "+-------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(6,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(6),Int64(5)) |",
+            "+-----------------------------------+",
+            "+-----------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(1,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(1),Int64(5)) |",
+            "+-----------------------------------+",
+            "| 1                                 |",
+            "| 2                                 |",
+            "| 3                                 |",
+            "| 4                                 |",
+            "| 5                                 |",
+            "+-----------------------------------+",
+        ];

Review Comment:
   This is a good example of a `UDT` producing more row than went in 👍 
   
   Would it be possible to write an example that also produces a different number of *columns* than went in? I think that is what @Ted-Jiang and I are pointing out in in our comments below



##########
datafusion/core/src/execution/context.rs:
##########
@@ -2115,6 +2141,195 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn user_defined_table_function() -> Result<()> {
+        let mut ctx = SessionContext::new();
+
+        let integer_series = integer_udtf();
+        ctx.register_udtf(create_udtf(
+            "integer_series",
+            vec![DataType::Int64, DataType::Int64],
+            Arc::new(DataType::Int64),
+            Volatility::Immutable,
+            integer_series,
+        ));
+
+        let struct_func = struct_udtf();
+        ctx.register_udtf(create_udtf(
+            "struct_func",
+            vec![DataType::Int64],
+            Arc::new(DataType::Struct(
+                [
+                    Field::new("f1", DataType::Utf8, false),
+                    Field::new("f2", DataType::Int64, false),
+                ]
+                .to_vec(),
+            )),
+            Volatility::Immutable,
+            struct_func,
+        ));
+
+        let result = plan_and_collect(&ctx, "SELECT struct_func(5)").await?;
+
+        let expected = vec![
+            "+-------------------------+",
+            "| struct_func(Int64(5))   |",
+            "+-------------------------+",
+            "| {\"f1\": \"test\", \"f2\": 5} |",
+            "+-------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(6,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(6),Int64(5)) |",
+            "+-----------------------------------+",
+            "+-----------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(1,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(1),Int64(5)) |",
+            "+-----------------------------------+",
+            "| 1                                 |",
+            "| 2                                 |",
+            "| 3                                 |",
+            "| 4                                 |",
+            "| 5                                 |",
+            "+-----------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(
+            &ctx,
+            "SELECT asd, struct_func(qwe), integer_series(asd, qwe), integer_series(1, qwe) r FROM (select 1 asd, 3 qwe UNION ALL select 2 asd, 4 qwe) x",
+        )
+        .await?;
+
+        let expected = vec![
+            "+-----+-------------------------+-----------------------------+---+",
+            "| asd | struct_func(x.qwe)      | integer_series(x.asd,x.qwe) | r |",
+            "+-----+-------------------------+-----------------------------+---+",
+            "| 1   | {\"f1\": \"test\", \"f2\": 3} | 1                           | 1 |",
+            "| 1   |                         | 2                           | 2 |",
+            "| 1   |                         | 3                           | 3 |",
+            "| 2   | {\"f1\": \"test\", \"f2\": 4} | 2                           | 1 |",
+            "| 2   |                         | 3                           | 2 |",
+            "| 2   |                         | 4                           | 3 |",
+            "| 2   |                         |                             | 4 |",
+            "+-----+-------------------------+-----------------------------+---+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result =
+            plan_and_collect(&ctx, "SELECT * from integer_series(1,5) pos(n)").await?;

Review Comment:
   Can you explain what this test is supposed to be demonstrating? I am not quite sure what it shows



##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   I am also a little mystified by this signature
   
   . It looks like "Second tuple" was the start of a thought that didn't get finished? I also don't understand what the `usize` in the tuple represents -- perhaps you can add some comments explaining its purpose?
   
   Also, I agree with @Ted-Jiang  's analysis -- I would expect this signature to return a "table" (aka a `RecordBatch` or a `Vec<ColumnarValue>` if preferred
   
   Perhaps something like
   
   ```rust
   Arc<dyn Fn(&[ColumnarValue]) -> Result<RecordBatch> + Send + Sync>;
   ```
   
   or
   
   ```rust
   Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>;
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] doki23 commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
doki23 commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861450604


##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   I guess that @gandronchik wants to chain each result(ArrayRef) of `TableFunctionImplementation` into a multi-column result (see the code in `TableFunStream::batch`), which may mean the table udf consists of multi exprs. The reason should be trait `PhysicalExpr` only provides `fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>`. But I agree that `Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>` is more proper. So I believe that the approach may be directly invoke the table udf in the `TableFunStream` without implementing trait `PhysicalExpr` for it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] doki23 commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
doki23 commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861450604


##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   I guess that @gandronchik wants to chain each result(ArrayRef) of `TableFunctionImplementation` into a multi-column result (see the code in `TableFunStream::batch`), which may mean the table udf consists of multi exprs. But I agree that `Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>` is more proper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] thinkharderdev commented on pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1100014400

   > Hmmmm...I have some problems about this pr. If we treat UDTF as an expression, does it mean that it can only produce one column? As I mentioned before ([#2177 (comment)](https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1094208748)), it's more like a table so that we can `select * from` it and get any number of columns. I'm confused, would you please explain it to me? @alamb @gandronchik
   
   I had the same question. I'm not sure I understand how this is different from a scalar function. It seems like a table function should produce `RecordBatch`s and effectively compile down to an `ExecutionPlan`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Ted-Jiang commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r860543127


##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   Or in this case it can generate N*M table



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] doki23 commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
doki23 commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861450604


##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   I guess that @gandronchik wants to chain each result(ArrayRef) of `TableFunctionImplementation` into a multi-column result (see the code in `TableFunStream::batch`), which may mean the table udf consists of multi exprs. The reason should be trait `PhysicalExpr` only provides `fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>`. But I agree that `Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>` is more proper. So I believe that the approach may be invoke the table udf in the TableFunStream without implementing trait `PhysicalExpr` for it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1135961673

   I think adding UDTFs (aka user defined table functions) that produce a 2 dimensional table output (aka `Vec<RecordBatch>` or a `SendableRecordBatchStream`) would be a valuable addition to DataFusion. 
   
   I think Spark calls these "table value functions":
   
   https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-qry-select-tvf.html
   
   Postgres calls them table functions:
   
   https://www.postgresql.org/docs/7.3/xfunc-tablefunctions.html
   
   However, this PR does not implement table functions that I can see. I still don't fully understand the usecase for the code in this PR for a function that returns a single column of values and I don't know of any other system that implements such functions. Thus I feel that this PR adds a feature that is not widely usable to DataFusion users as a whole, and so I don't feel I can approve it.
   
   If others (users or maintainers) have a perspective on this issue, I would love to hear them too. If there is broader support for this feature, I won't oppose merging it. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1153117627

   Thanks @gandronchik  -- I will try and find time to re-review this PR over the next few days in light of the information above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] doki23 commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
doki23 commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861450604


##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   I guess that @gandronchik wants to chain each result(ArrayRef) of `TableFunctionImplementation` into a multi-column result (see the code in `TableFunStream::batch`), which may mean the table udf consists of multi exprs. The reason should be trait `PhysicalExpr` only provides `fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>`. But I agree that `Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>` is more proper. So I believe that the approach may be directly invoke the table udf in the `TableFunStream` without implementing trait `PhysicalExpr` for it, or adding `fn evaluate(&self, batch: &RecordBatch) -> Result<Vec<ColumnarValue>>` for  `PhysicalExpr`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] thinkharderdev commented on pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1100368801

   > what about `Result<Vec<ColumnarValue>>`. I already almost implemented it this way:)
   > 
   > > > It seems like a table function should produce RecordBatchs and effectively compile down to an ExecutionPlan.
   > > 
   > > 
   > > I agree it should definitely produce `RecordBatch`
   
   That's essentially a `RecordBatch` :) 
   
   You could have 
   ```rust
   pub type TableFunctionImplementation =
       Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>;
   
   // This is a terrible name but this would be analogous to ReturnTypeFunction/StateTypeFunction
   pub type TableSchemaFunction = 
       Arc<dyn Fn(&[DataType]) -> Result<SchemaRef> + Send + Sync>; 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1382717581

   This PR is more than 6 month old, so closing it down for now to clean up the PR list. Please reopen if this is a mistake and you plan to work on it more 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] doki23 commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
doki23 commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861450604


##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   I guess that @gandronchik wants to chain each result(ArrayRef) of `TableFunctionImplementation` into a multi-column result (see the code in `TableFunStream::batch`), which may mean the table udf consists of multi exprs. The reason should be trait `PhysicalExpr` only provides `fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>`. But I agree that `Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>` is more proper. So I believe that the approach may be directly invoke the table udf in the TableFunStream without implementing trait `PhysicalExpr` for it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] doki23 commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
doki23 commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861450604


##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   I guess that @gandronchik wants to chain each result(ArrayRef) of `TableFunctionImplementation` into a multi-column result (see the code in `TableFunStream::batch`). But I agree that `Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>` is more proper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] gandronchik commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
gandronchik commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1169719635

   @alamb Hello! I think it will be easier to understand what I implemented here if you check how `generate_series` function works in Postgres. Just try to call the following requests:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] gandronchik commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
gandronchik commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1152184255

   @alamb Hello! Sorry for the long response. 
   
   I am sorry for so big PR with so a bad description. 
   
   Now I try to explain what is happening here. 
   Honestly, I made mistake with the naming. I supported Set Returning Function. (https://www.postgresql.org/docs/current/functions-srf.html)
   
   As I know DataFunction is oriented on PostgreSQL behavior.  So, the functionality I provide here is Postgres functionality. 
   
   We already use it in Cube.js. We implemented a several functions:
   - **generate_series** (https://www.postgresql.org/docs/current/functions-srf.html)
   - **generate_subscripts** (https://www.postgresql.org/docs/current/functions-srf.html)
   - **unnest** (https://www.postgresql.org/docs/current/functions-array.html)
   
   Please, look at my PR closer. I am ready to improve it, rename some structures, etc.
   
   
   Bellow, I provide the implementation of generate_series function (real Postgres function):
   
   ```
   macro_rules! generate_series_udtf {
       ($ARGS:expr, $TYPE: ident, $PRIMITIVE_TYPE: ident) => {{
           let mut section_sizes: Vec<usize> = Vec::new();
           let l_arr = &$ARGS[0].as_any().downcast_ref::<PrimitiveArray<$TYPE>>();
           if l_arr.is_some() {
               let l_arr = l_arr.unwrap();
               let r_arr = downcast_primitive_arg!($ARGS[1], "right", $TYPE);
               let step_arr = PrimitiveArray::<$TYPE>::from_value(1 as $PRIMITIVE_TYPE, 1);
               let step_arr = if $ARGS.len() > 2 {
                   downcast_primitive_arg!($ARGS[2], "step", $TYPE)
               } else {
                   &step_arr
               };
   
               let mut builder = PrimitiveBuilder::<$TYPE>::new(1);
               for (i, (start, end)) in l_arr.iter().zip(r_arr.iter()).enumerate() {
                   let step = if step_arr.len() > i {
                       step_arr.value(i)
                   } else {
                       step_arr.value(0)
                   };
   
                   let start = start.unwrap();
                   let end = end.unwrap();
                   let mut section_size: i64 = 0;
                   if start <= end && step > 0 as $PRIMITIVE_TYPE {
                       let mut current = start;
                       loop {
                           if current > end {
                               break;
                           }
                           builder.append_value(current).unwrap();
   
                           section_size += 1;
                           current += step;
                       }
                   }
                   section_sizes.push(section_size as usize);
               }
   
               return Ok((Arc::new(builder.finish()) as ArrayRef, section_sizes));
           }
       }};
   }
   
   pub fn create_generate_series_udtf() -> TableUDF {
       let fun = make_table_function(move |args: &[ArrayRef]| {
           assert!(args.len() == 2 || args.len() == 3);
   
           if args[0].as_any().downcast_ref::<Int64Array>().is_some() {
               generate_series_udtf!(args, Int64Type, i64)
           } else if args[0].as_any().downcast_ref::<Float64Array>().is_some() {
               generate_series_udtf!(args, Float64Type, f64)
           }
   
           Err(DataFusionError::Execution(format!("Unsupported type")))
       });
   
       let return_type: ReturnTypeFunction = Arc::new(move |tp| {
           if tp.len() > 0 {
               Ok(Arc::new(tp[0].clone()))
           } else {
               Ok(Arc::new(DataType::Int64))
           }
       });
   
       TableUDF::new(
           "generate_series",
           &Signature::one_of(
               vec![
                   TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]),
                   TypeSignature::Exact(vec![DataType::Int64, DataType::Int64, DataType::Int64]),
                   TypeSignature::Exact(vec![DataType::Float64, DataType::Float64]),
                   TypeSignature::Exact(vec![
                       DataType::Float64,
                       DataType::Float64,
                       DataType::Float64,
                   ]),
               ],
               Volatility::Immutable,
           ),
           &return_type,
           &fun,
       )
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on a diff in pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
xudong963 commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r846623581


##########
datafusion/core/src/optimizer/simplify_expressions.rs:
##########
@@ -381,6 +381,7 @@ impl<'a> ConstEvaluator<'a> {
             | Expr::QualifiedWildcard { .. } => false,
             Expr::ScalarFunction { fun, .. } => Self::volatility_ok(fun.volatility()),
             Expr::ScalarUDF { fun, .. } => Self::volatility_ok(fun.signature.volatility),
+            Expr::TableUDF { .. } => false,

Review Comment:
   ditto



##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -99,6 +99,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
             Expr::ScalarUDF { fun, .. } => {
                 self.visit_volatility(fun.signature.volatility)
             }
+            Expr::TableUDF { fun, .. } => self.visit_volatility(fun.signature.volatility),

Review Comment:
   I recommend writing it like this:
   ```rust
               Expr::ScalarUDF { fun, .. } | Expr::TableUDF { fun, .. } => {
                   self.visit_volatility(fun.signature.volatility)
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] doki23 commented on pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
doki23 commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1098869114

   Hmmmm...I have some problems about this pr.
   If we treat UDTF as an expression, does it mean that it can only produce one column?
   As I mentioned before (https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1094208748), it's more like a table so that we can `select * from` it and get any number of columns.
   I'm confused, would you please explain it to me? @alamb @gandronchik 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1100151105

   > It seems like a table function should produce RecordBatchs and effectively compile down to an ExecutionPlan.
   
   I agree it should definitely produce `RecordBatch` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] gandronchik commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
gandronchik commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861672445


##########
datafusion/core/src/execution/context.rs:
##########
@@ -2115,6 +2141,195 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn user_defined_table_function() -> Result<()> {
+        let mut ctx = SessionContext::new();
+
+        let integer_series = integer_udtf();
+        ctx.register_udtf(create_udtf(
+            "integer_series",
+            vec![DataType::Int64, DataType::Int64],
+            Arc::new(DataType::Int64),
+            Volatility::Immutable,
+            integer_series,
+        ));
+
+        let struct_func = struct_udtf();
+        ctx.register_udtf(create_udtf(
+            "struct_func",
+            vec![DataType::Int64],
+            Arc::new(DataType::Struct(
+                [
+                    Field::new("f1", DataType::Utf8, false),
+                    Field::new("f2", DataType::Int64, false),
+                ]
+                .to_vec(),
+            )),
+            Volatility::Immutable,
+            struct_func,
+        ));
+
+        let result = plan_and_collect(&ctx, "SELECT struct_func(5)").await?;
+
+        let expected = vec![
+            "+-------------------------+",
+            "| struct_func(Int64(5))   |",
+            "+-------------------------+",
+            "| {\"f1\": \"test\", \"f2\": 5} |",
+            "+-------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(6,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(6),Int64(5)) |",
+            "+-----------------------------------+",
+            "+-----------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(1,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(1),Int64(5)) |",
+            "+-----------------------------------+",
+            "| 1                                 |",
+            "| 2                                 |",
+            "| 3                                 |",
+            "| 4                                 |",
+            "| 5                                 |",
+            "+-----------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(
+            &ctx,
+            "SELECT asd, struct_func(qwe), integer_series(asd, qwe), integer_series(1, qwe) r FROM (select 1 asd, 3 qwe UNION ALL select 2 asd, 4 qwe) x",
+        )
+        .await?;
+
+        let expected = vec![
+            "+-----+-------------------------+-----------------------------+---+",
+            "| asd | struct_func(x.qwe)      | integer_series(x.asd,x.qwe) | r |",
+            "+-----+-------------------------+-----------------------------+---+",
+            "| 1   | {\"f1\": \"test\", \"f2\": 3} | 1                           | 1 |",
+            "| 1   |                         | 2                           | 2 |",
+            "| 1   |                         | 3                           | 3 |",
+            "| 2   | {\"f1\": \"test\", \"f2\": 4} | 2                           | 1 |",
+            "| 2   |                         | 3                           | 2 |",
+            "| 2   |                         | 4                           | 3 |",
+            "| 2   |                         |                             | 4 |",
+            "+-----+-------------------------+-----------------------------+---+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result =
+            plan_and_collect(&ctx, "SELECT * from integer_series(1,5) pos(n)").await?;

Review Comment:
   I have just explained it in the header of PR. Hope I did it clear enough:)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1109783993

   
   @alamb @thinkharderdev @doki23  i met the same problem in #2343 
   
    if we treat it as a `Expr` , we need change it to `PhysicalExpr` but 
   ``` rust
   /// Evaluate an expression against a RecordBatch
       fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
   ```
   
   ```rust
   pub enum ColumnarValue {
       /// Array of values
       Array(ArrayRef),
       /// A single value
       Scalar(ScalarValue),
   }
   ```
   cause of it return ColumnarValue, we can not return result as a table, am i right?
   
   Should i implement a `TablePhysicalExpr`
   using 
   ```rust
     fn evaluate(&self, batch: &RecordBatch) -> Result<Vec<ColumnarValue>>;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] doki23 commented on pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
doki23 commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1094208748

   Hmm, is `TableFunction` an expression 🤔? 
   refer to https://docs.snowflake.com/en/sql-reference/functions-table.html
   The sql looks usually like 
   ```
   select doi.date as "Date", record_temperatures.city, record_temperatures.temperature
       from dates_of_interest as doi,
            table(record_high_temperatures_for_date(doi.date)) as record_temperatures;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] gandronchik commented on pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
gandronchik commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1100273504

   > 
   
   what about Result<Vec<ColumnarValue>>. I already almost implemented it this way:)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1110223901

   > @alamb @thinkharderdev @doki23 i met the same problem in https://github.com/apache/arrow-datafusion/issues/2343
   
   I left some thoughts in 
   
   https://github.com/apache/arrow-datafusion/issues/2343#issuecomment-1110222756
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] doki23 commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
doki23 commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861450604


##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   I guess that @gandronchik wants to chain each result(ArrayRef) of `TableFunctionImplementation` into a multi-column result (see the code in `TableFunStream::batch`), which may mean the table udf consists of multi exprs. The reason should be trait `PhysicalExpr` only provides `fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>`. But I agree that `Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>` is more proper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] gandronchik commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
gandronchik commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861674286


##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   I updated the header of PR. Hope it is clear enough now:)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
xudong963 commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1093964538

   BTW, from clippy:
   ```
   error: unneeded `return` statement
      --> datafusion/core/src/physical_plan/functions.rs:752:9
       |
   752 |         return Ok(ColumnarValue::Array(result));
       |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: remove `return`: `Ok(ColumnarValue::Array(result))`
       |
       = note: `-D clippy::needless-return` implied by `-D warnings`
       = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return
   
   error: could not compile `datafusion` due to previous error
   warning: build failed, waiting for other jobs to finish...
   error: called `.nth(0)` on a `std::iter::Iterator`, when `.next()` is equivalent
       --> datafusion/core/src/execution/context.rs:3527:32
        |
   3527 |             let start_number = start_arr.into_iter().nth(0).unwrap().unwrap_or(0);
        |                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try calling `.next()` instead of `.nth(0)`: `start_arr.into_iter().next()`
        |
        = note: `-D clippy::iter-nth-zero` implied by `-D warnings`
        = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#iter_nth_zero
   
   error: called `.nth(0)` on a `std::iter::Iterator`, when `.next()` is equivalent
       --> datafusion/core/src/execution/context.rs:3533:30
        |
   3533 |             let end_number = end_arr.into_iter().nth(0).unwrap().unwrap_or(0) + 1;
        |                              ^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try calling `.next()` instead of `.nth(0)`: `end_arr.into_iter().next()`
        |
        = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#iter_nth_zero
   
   error: build failed
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1148958334

   marking as draft until we figure out what to do with this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Ted-Jiang commented on a diff in pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r860542317


##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   as  ArrayRef is one of ColumnarValue
   ``` Rust
   pub enum ColumnarValue {
       /// Array of values
       Array(ArrayRef),
       /// A single value
       Scalar(ScalarValue),
   }
   ```
   I think `TableFunctionImplementation ` is same as `ScalarFunctionImplementation `.
   And it only generate table N*1 , if we use as https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1100368801
   ```rust
   Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(Vec< ColumnarValue >, Vec<usize>)> + Send + Sync>;
   ```
   We could generate N*M table
   If im wrong plz correct me? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1111365656

   I plan to give this a more careful review tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] gandronchik commented on a diff in pull request #2177: udtf support

Posted by GitBox <gi...@apache.org>.
gandronchik commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r846963525


##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -99,6 +99,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
             Expr::ScalarUDF { fun, .. } => {
                 self.visit_volatility(fun.signature.volatility)
             }
+            Expr::TableUDF { fun, .. } => self.visit_volatility(fun.signature.volatility),

Review Comment:
   good point, however, it doesn't work in this case (because argument fun has different types for TableUDF and ScalarUDF)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb closed pull request #2177: User Defined Table Function (udtf) support

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #2177: User Defined Table Function (udtf) support
URL: https://github.com/apache/arrow-datafusion/pull/2177


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org