You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/28 19:41:55 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2177: User Defined Table Function (udtf) support

alamb commented on code in PR #2177:
URL: https://github.com/apache/arrow-datafusion/pull/2177#discussion_r861245271


##########
datafusion/core/src/execution/context.rs:
##########
@@ -2115,6 +2141,195 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn user_defined_table_function() -> Result<()> {
+        let mut ctx = SessionContext::new();
+
+        let integer_series = integer_udtf();
+        ctx.register_udtf(create_udtf(
+            "integer_series",
+            vec![DataType::Int64, DataType::Int64],
+            Arc::new(DataType::Int64),
+            Volatility::Immutable,
+            integer_series,
+        ));
+
+        let struct_func = struct_udtf();
+        ctx.register_udtf(create_udtf(
+            "struct_func",
+            vec![DataType::Int64],
+            Arc::new(DataType::Struct(
+                [
+                    Field::new("f1", DataType::Utf8, false),
+                    Field::new("f2", DataType::Int64, false),
+                ]
+                .to_vec(),
+            )),
+            Volatility::Immutable,
+            struct_func,
+        ));
+
+        let result = plan_and_collect(&ctx, "SELECT struct_func(5)").await?;
+
+        let expected = vec![
+            "+-------------------------+",
+            "| struct_func(Int64(5))   |",
+            "+-------------------------+",
+            "| {\"f1\": \"test\", \"f2\": 5} |",
+            "+-------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(6,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(6),Int64(5)) |",
+            "+-----------------------------------+",
+            "+-----------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(1,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(1),Int64(5)) |",
+            "+-----------------------------------+",
+            "| 1                                 |",
+            "| 2                                 |",
+            "| 3                                 |",
+            "| 4                                 |",
+            "| 5                                 |",
+            "+-----------------------------------+",
+        ];

Review Comment:
   This is a good example of a `UDT` producing more row than went in 👍 
   
   Would it be possible to write an example that also produces a different number of *columns* than went in? I think that is what @Ted-Jiang and I are pointing out in in our comments below



##########
datafusion/core/src/execution/context.rs:
##########
@@ -2115,6 +2141,195 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn user_defined_table_function() -> Result<()> {
+        let mut ctx = SessionContext::new();
+
+        let integer_series = integer_udtf();
+        ctx.register_udtf(create_udtf(
+            "integer_series",
+            vec![DataType::Int64, DataType::Int64],
+            Arc::new(DataType::Int64),
+            Volatility::Immutable,
+            integer_series,
+        ));
+
+        let struct_func = struct_udtf();
+        ctx.register_udtf(create_udtf(
+            "struct_func",
+            vec![DataType::Int64],
+            Arc::new(DataType::Struct(
+                [
+                    Field::new("f1", DataType::Utf8, false),
+                    Field::new("f2", DataType::Int64, false),
+                ]
+                .to_vec(),
+            )),
+            Volatility::Immutable,
+            struct_func,
+        ));
+
+        let result = plan_and_collect(&ctx, "SELECT struct_func(5)").await?;
+
+        let expected = vec![
+            "+-------------------------+",
+            "| struct_func(Int64(5))   |",
+            "+-------------------------+",
+            "| {\"f1\": \"test\", \"f2\": 5} |",
+            "+-------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(6,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(6),Int64(5)) |",
+            "+-----------------------------------+",
+            "+-----------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(&ctx, "SELECT integer_series(1,5)").await?;
+
+        let expected = vec![
+            "+-----------------------------------+",
+            "| integer_series(Int64(1),Int64(5)) |",
+            "+-----------------------------------+",
+            "| 1                                 |",
+            "| 2                                 |",
+            "| 3                                 |",
+            "| 4                                 |",
+            "| 5                                 |",
+            "+-----------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result = plan_and_collect(
+            &ctx,
+            "SELECT asd, struct_func(qwe), integer_series(asd, qwe), integer_series(1, qwe) r FROM (select 1 asd, 3 qwe UNION ALL select 2 asd, 4 qwe) x",
+        )
+        .await?;
+
+        let expected = vec![
+            "+-----+-------------------------+-----------------------------+---+",
+            "| asd | struct_func(x.qwe)      | integer_series(x.asd,x.qwe) | r |",
+            "+-----+-------------------------+-----------------------------+---+",
+            "| 1   | {\"f1\": \"test\", \"f2\": 3} | 1                           | 1 |",
+            "| 1   |                         | 2                           | 2 |",
+            "| 1   |                         | 3                           | 3 |",
+            "| 2   | {\"f1\": \"test\", \"f2\": 4} | 2                           | 1 |",
+            "| 2   |                         | 3                           | 2 |",
+            "| 2   |                         | 4                           | 3 |",
+            "| 2   |                         |                             | 4 |",
+            "+-----+-------------------------+-----------------------------+---+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        let result =
+            plan_and_collect(&ctx, "SELECT * from integer_series(1,5) pos(n)").await?;

Review Comment:
   Can you explain what this test is supposed to be demonstrating? I am not quite sure what it shows



##########
datafusion/expr/src/function.rs:
##########
@@ -39,6 +40,10 @@ use std::sync::Arc;
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
+/// Table function. Second tuple
+pub type TableFunctionImplementation =
+    Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>;

Review Comment:
   I am also a little mystified by this signature
   
   . It looks like "Second tuple" was the start of a thought that didn't get finished? I also don't understand what the `usize` in the tuple represents -- perhaps you can add some comments explaining its purpose?
   
   Also, I agree with @Ted-Jiang  's analysis -- I would expect this signature to return a "table" (aka a `RecordBatch` or a `Vec<ColumnarValue>` if preferred
   
   Perhaps something like
   
   ```rust
   Arc<dyn Fn(&[ColumnarValue]) -> Result<RecordBatch> + Send + Sync>;
   ```
   
   or
   
   ```rust
   Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>;
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org