You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/09 12:56:24 UTC

[GitHub] [arrow-datafusion] msathis opened a new pull request #1969: Coalesce function - WIP

msathis opened a new pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969


   # Which issue does this PR close?
   
   Closes #1890.
   
    # Rationale for this change
   To support `coalesce` function in SQL
   
   # What changes are included in this PR?
   Basic `coalesce` support. Now everything is treated as string, will have to work to support other data types properly.
   
   # Are there any user-facing changes?
   Users can use `coalesce` function in SQL now
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] msathis commented on pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
msathis commented on pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#issuecomment-1086682914


   > @msathis If the pr is ready to review, please at me. Thanks.
   
   @liukun4515 Your review will be great! I think the PR is ready. My knowledge is limited, i feel more optimisation is possible. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#discussion_r841061741



##########
File path: datafusion/core/tests/sql/functions.rs
##########
@@ -174,3 +174,202 @@ async fn query_count_distinct() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn coalesce_plan() -> Result<()> {
+    let ctx = SessionContext::new();
+    let sql = "select COALESCE('', 'test')";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2) FROM test";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------+",
+        "| coalesce(test.c1,test.c2) |",
+        "+---------------------------+",
+        "| 0                         |",
+        "| 1                         |",

Review comment:
       👍 

##########
File path: datafusion/core/tests/sql/functions.rs
##########
@@ -174,3 +174,202 @@ async fn query_count_distinct() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn coalesce_plan() -> Result<()> {
+    let ctx = SessionContext::new();
+    let sql = "select COALESCE('', 'test')";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2) FROM test";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------+",
+        "| coalesce(test.c1,test.c2) |",
+        "+---------------------------+",
+        "| 0                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "|                           |",
+        "+---------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_empty_value() -> Result<()> {
+    let ctx = SessionContext::new();
+    let sql = "SELECT COALESCE('', 'test')";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_value_with_null() -> Result<()> {
+    let ctx = SessionContext::new();
+    let sql = "SELECT COALESCE(NULL, 'test')";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-----------------------------------+",
+        "| coalesce(Utf8(NULL),Utf8(\"test\")) |",
+        "+-----------------------------------+",
+        "| test                              |",
+        "+-----------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result_with_default_value() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2, '-1') FROM test";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+--------------------------------------+",
+        "| coalesce(test.c1,test.c2,Utf8(\"-1\")) |",
+        "+--------------------------------------+",
+        "| 0                                    |",
+        "| 1                                    |",
+        "| 1                                    |",
+        "| 1                                    |",
+        "| -1                                   |",
+        "+--------------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_sum_with_default_value() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(1), None, Some(1), None])),
+            Arc::new(Int32Array::from(vec![Some(2), Some(2), None, None])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT SUM(COALESCE(c1, c2, 0)) FROM test";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-----------------------------------------+",
+        "| SUM(coalesce(test.c1,test.c2,Int64(0))) |",
+        "+-----------------------------------------+",
+        "| 4                                       |",
+        "+-----------------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_mul_with_default_value() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(1), None, Some(1), None])),
+            Arc::new(Int32Array::from(vec![Some(2), Some(2), None, None])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1 * c2, 0) FROM test";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------------------+",
+        "| coalesce(test.c1 Multiply test.c2,Int64(0)) |",
+        "+---------------------------------------------+",
+        "| 2                                           |",
+        "| 0                                           |",
+        "| 0                                           |",
+        "| 0                                           |",
+        "+---------------------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())

Review comment:
       nice testing 👍 

##########
File path: datafusion/expr/src/expr_fn.rs
##########
@@ -297,6 +297,14 @@ pub fn array(args: Vec<Expr>) -> Expr {
     }
 }
 
+/// Returns an coalesce of fixed size with each argument on it.

Review comment:
       ```suggestion
   /// Returns `coalesce(args...)`, which evaluates to the value of the first [Expr] 
   /// which is not NULL
   ```

##########
File path: datafusion/core/tests/sql/functions.rs
##########
@@ -174,3 +174,202 @@ async fn query_count_distinct() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn coalesce_plan() -> Result<()> {
+    let ctx = SessionContext::new();
+    let sql = "select COALESCE('', 'test')";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2) FROM test";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------+",
+        "| coalesce(test.c1,test.c2) |",
+        "+---------------------------+",
+        "| 0                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "|                           |",
+        "+---------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_empty_value() -> Result<()> {
+    let ctx = SessionContext::new();
+    let sql = "SELECT COALESCE('', 'test')";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_value_with_null() -> Result<()> {
+    let ctx = SessionContext::new();
+    let sql = "SELECT COALESCE(NULL, 'test')";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-----------------------------------+",
+        "| coalesce(Utf8(NULL),Utf8(\"test\")) |",
+        "+-----------------------------------+",
+        "| test                              |",
+        "+-----------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result_with_default_value() -> Result<()> {

Review comment:
       stylistically this might be better if it were right next to `coalesce_result` given its similarity.
   
   For that matter, since it uses the same data setup, it might be cool to combine the two tests (and run two different queries in them)

##########
File path: datafusion/core/tests/sql/functions.rs
##########
@@ -174,3 +174,202 @@ async fn query_count_distinct() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn coalesce_plan() -> Result<()> {
+    let ctx = SessionContext::new();
+    let sql = "select COALESCE('', 'test')";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2) FROM test";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------+",
+        "| coalesce(test.c1,test.c2) |",
+        "+---------------------------+",
+        "| 0                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "|                           |",
+        "+---------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_empty_value() -> Result<()> {

Review comment:
       this test seems to be the same as `coalesce_plan` -- is that intentional?

##########
File path: datafusion/core/tests/sql/functions.rs
##########
@@ -174,3 +174,202 @@ async fn query_count_distinct() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn coalesce_plan() -> Result<()> {
+    let ctx = SessionContext::new();
+    let sql = "select COALESCE('', 'test')";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2) FROM test";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------+",
+        "| coalesce(test.c1,test.c2) |",
+        "+---------------------------+",
+        "| 0                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "|                           |",
+        "+---------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_empty_value() -> Result<()> {
+    let ctx = SessionContext::new();
+    let sql = "SELECT COALESCE('', 'test')";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_value_with_null() -> Result<()> {

Review comment:
       👍 

##########
File path: datafusion/physical-expr/src/conditional_expressions.rs
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{new_null_array, Array, BooleanArray};
+use arrow::compute;
+use arrow::compute::kernels::zip::zip;
+use arrow::datatypes::DataType;
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::ColumnarValue;
+
+pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {

Review comment:
       ```suggestion
   /// coalesce evaluates to the first value which is not NULL
   pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#issuecomment-1074305800


   > @yjshen Yup. Thanks for pointing out. I'll add it to Ballista with the optimization @Dandandan suggested
   
   
   @msathis  do you plan to do this as a follow on PR or this PR?
   
   In other words is this PR ready for review again or are you still working on it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1969: Add Coalesce function

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#issuecomment-1086837851


   Thanks @msathis  --  I think this is looking great


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#issuecomment-1086615394


   I will wait for @liukun4515  to review as well prior to merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#discussion_r835831482



##########
File path: datafusion/tests/sql/functions.rs
##########
@@ -174,3 +174,202 @@ async fn query_count_distinct() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn coalesce_plan() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "select COALESCE('', 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2) FROM test";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------+",
+        "| coalesce(test.c1,test.c2) |",
+        "+---------------------------+",
+        "| 0                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "|                           |",
+        "+---------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_empty_value() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "SELECT COALESCE('', 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_value_with_null() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "SELECT COALESCE(NULL, 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+-----------------------------------+",
+        "| coalesce(Utf8(NULL),Utf8(\"test\")) |",
+        "+-----------------------------------+",
+        "| test                              |",
+        "+-----------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result_with_default_value() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2, '-1') FROM test";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+--------------------------------------+",
+        "| coalesce(test.c1,test.c2,Utf8(\"-1\")) |",
+        "+--------------------------------------+",
+        "| 0                                    |",
+        "| 1                                    |",
+        "| 1                                    |",
+        "| 1                                    |",
+        "| -1                                   |",
+        "+--------------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_sum_with_default_value() -> Result<()> {

Review comment:
       you can try this case
   
   ```
   select COALESCE(sum(c3),0) from table group by xxx
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#discussion_r833839747



##########
File path: datafusion/tests/sql/functions.rs
##########
@@ -174,3 +174,202 @@ async fn query_count_distinct() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn coalesce_plan() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "select COALESCE('', 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2) FROM test";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------+",
+        "| coalesce(test.c1,test.c2) |",
+        "+---------------------------+",
+        "| 0                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "|                           |",
+        "+---------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_empty_value() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "SELECT COALESCE('', 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_value_with_null() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "SELECT COALESCE(NULL, 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+-----------------------------------+",
+        "| coalesce(Utf8(NULL),Utf8(\"test\")) |",
+        "+-----------------------------------+",
+        "| test                              |",
+        "+-----------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result_with_default_value() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2, '-1') FROM test";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+--------------------------------------+",
+        "| coalesce(test.c1,test.c2,Utf8(\"-1\")) |",
+        "+--------------------------------------+",
+        "| 0                                    |",
+        "| 1                                    |",
+        "| 1                                    |",
+        "| 1                                    |",
+        "| -1                                   |",
+        "+--------------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_sum_with_default_value() -> Result<()> {

Review comment:
       It's better to add other case with agg function from https://github.com/apache/arrow-datafusion/issues/2067#issuecomment-1076759264
   ```
   select COALESCE(sum(c3),0) from table
   ```
   All the value of c3 is null.
   @msathis 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#discussion_r833836718



##########
File path: datafusion/src/physical_plan/functions.rs
##########
@@ -111,6 +112,11 @@ pub fn return_type(
             utf8_to_int_type(&input_expr_types[0], "character_length")
         }
         BuiltinScalarFunction::Chr => Ok(DataType::Utf8),
+        BuiltinScalarFunction::Coalesce => {
+            // COALESCE has multiple args and they might get coerced, get a preview of this
+            let coerced_types = data_types(input_expr_types, &signature(fun));

Review comment:
       Is there any doc or reference to guide for the common result type or the coercion type?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#issuecomment-1069238968


   @msathis Could you please also add this to Ballista, as #2008 did?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] msathis commented on pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
msathis commented on pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#issuecomment-1077435494


   > R ready for review again or are you still working on it?
   
   Hi @alamb , Sorry, i got busy with some office work. I'll try to revise the PR in the next couple of days. I was planning to address all the performance optimisations @Dandandan suggested. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] msathis commented on a change in pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
msathis commented on a change in pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#discussion_r834113059



##########
File path: datafusion/tests/sql/functions.rs
##########
@@ -174,3 +174,202 @@ async fn query_count_distinct() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn coalesce_plan() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "select COALESCE('', 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2) FROM test";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------+",
+        "| coalesce(test.c1,test.c2) |",
+        "+---------------------------+",
+        "| 0                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "|                           |",
+        "+---------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_empty_value() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "SELECT COALESCE('', 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_value_with_null() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "SELECT COALESCE(NULL, 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+-----------------------------------+",
+        "| coalesce(Utf8(NULL),Utf8(\"test\")) |",
+        "+-----------------------------------+",
+        "| test                              |",
+        "+-----------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result_with_default_value() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2, '-1') FROM test";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+--------------------------------------+",
+        "| coalesce(test.c1,test.c2,Utf8(\"-1\")) |",
+        "+--------------------------------------+",
+        "| 0                                    |",
+        "| 1                                    |",
+        "| 1                                    |",
+        "| 1                                    |",
+        "| -1                                   |",
+        "+--------------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_sum_with_default_value() -> Result<()> {

Review comment:
       Makes sense. i'll update the test case!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#discussion_r833839747



##########
File path: datafusion/tests/sql/functions.rs
##########
@@ -174,3 +174,202 @@ async fn query_count_distinct() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn coalesce_plan() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "select COALESCE('', 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2) FROM test";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------+",
+        "| coalesce(test.c1,test.c2) |",
+        "+---------------------------+",
+        "| 0                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "|                           |",
+        "+---------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_empty_value() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "SELECT COALESCE('', 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_value_with_null() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "SELECT COALESCE(NULL, 'test')";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+-----------------------------------+",
+        "| coalesce(Utf8(NULL),Utf8(\"test\")) |",
+        "+-----------------------------------+",
+        "| test                              |",
+        "+-----------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result_with_default_value() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2, '-1') FROM test";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+--------------------------------------+",
+        "| coalesce(test.c1,test.c2,Utf8(\"-1\")) |",
+        "+--------------------------------------+",
+        "| 0                                    |",
+        "| 1                                    |",
+        "| 1                                    |",
+        "| 1                                    |",
+        "| -1                                   |",
+        "+--------------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_sum_with_default_value() -> Result<()> {

Review comment:
       It's better to add other case with agg function from https://github.com/apache/arrow-datafusion/issues/2067#issuecomment-1076780150
   ```
   select COALESCE(sum(c3),0) from table
   ```
   All the value of c3 is null.
   @msathis 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] msathis commented on a change in pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
msathis commented on a change in pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#discussion_r839429696



##########
File path: datafusion/physical-expr/src/conditional_expressions.rs
##########
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{new_null_array, Array};
+use arrow::compute;
+use arrow::compute::kernels::zip::zip;
+use arrow::datatypes::DataType;
+
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::ColumnarValue;
+
+pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    // do not accept 0 arguments.
+    if args.is_empty() {
+        return Err(DataFusionError::Internal(format!(
+            "coalesce was called with {} arguments. It requires at least 1.",
+            args.len()
+        )));
+    }
+
+    let size = match args[0] {
+        ColumnarValue::Array(ref a) => a.len(),
+        ColumnarValue::Scalar(ref _s) => 1,
+    };
+    let mut res = new_null_array(&args[0].data_type(), size);
+
+    for column_value in args {
+        for i in 0..size {
+            match column_value {
+                ColumnarValue::Array(array_ref) => {
+                    let bool_arr = compute::is_not_null(array_ref)?;
+                    res = zip(&bool_arr, array_ref, &res)?;
+                }
+                ColumnarValue::Scalar(scalar) => {
+                    if !scalar.is_null() {
+                        // TODO: Figure out how to set value at index

Review comment:
       Hi @alamb, I need your help here. I couldn't figure out how to set value to particular index of `arrow::Array`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] msathis commented on a change in pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
msathis commented on a change in pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#discussion_r840540700



##########
File path: datafusion/physical-expr/src/conditional_expressions.rs
##########
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{new_null_array, Array};
+use arrow::compute;
+use arrow::compute::kernels::zip::zip;
+use arrow::datatypes::DataType;
+
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::ColumnarValue;
+
+pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    // do not accept 0 arguments.
+    if args.is_empty() {
+        return Err(DataFusionError::Internal(format!(
+            "coalesce was called with {} arguments. It requires at least 1.",
+            args.len()
+        )));
+    }
+
+    let size = match args[0] {
+        ColumnarValue::Array(ref a) => a.len(),
+        ColumnarValue::Scalar(ref _s) => 1,
+    };
+    let mut res = new_null_array(&args[0].data_type(), size);
+
+    for column_value in args {
+        for i in 0..size {
+            match column_value {
+                ColumnarValue::Array(array_ref) => {
+                    let bool_arr = compute::is_not_null(array_ref)?;
+                    res = zip(&bool_arr, array_ref, &res)?;
+                }
+                ColumnarValue::Scalar(scalar) => {
+                    if !scalar.is_null() {
+                        // TODO: Figure out how to set value at index

Review comment:
       Thanks @alamb I think that worked. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#issuecomment-1077504603


   Thank you @msathis -- makes total sense. Let us know if you need any help!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#discussion_r839947162



##########
File path: datafusion/physical-expr/src/conditional_expressions.rs
##########
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{new_null_array, Array};
+use arrow::compute;
+use arrow::compute::kernels::zip::zip;
+use arrow::datatypes::DataType;
+
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::ColumnarValue;
+
+pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    // do not accept 0 arguments.
+    if args.is_empty() {
+        return Err(DataFusionError::Internal(format!(
+            "coalesce was called with {} arguments. It requires at least 1.",
+            args.len()
+        )));
+    }
+
+    let size = match args[0] {
+        ColumnarValue::Array(ref a) => a.len(),
+        ColumnarValue::Scalar(ref _s) => 1,
+    };
+    let mut res = new_null_array(&args[0].data_type(), size);
+
+    for column_value in args {
+        for i in 0..size {
+            match column_value {
+                ColumnarValue::Array(array_ref) => {
+                    let bool_arr = compute::is_not_null(array_ref)?;
+                    res = zip(&bool_arr, array_ref, &res)?;
+                }
+                ColumnarValue::Scalar(scalar) => {
+                    if !scalar.is_null() {
+                        // TODO: Figure out how to set value at index

Review comment:
       I think you may want to call `scalar. to_array_of_size(size)` here to create the appropriate array:
   
   https://github.com/alamb/arrow-datafusion/blob/41b4e49/datafusion/common/src/scalar.rs#L1075-L1076




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] msathis commented on a change in pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
msathis commented on a change in pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#discussion_r841080274



##########
File path: datafusion/core/tests/sql/functions.rs
##########
@@ -174,3 +174,202 @@ async fn query_count_distinct() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn coalesce_plan() -> Result<()> {
+    let ctx = SessionContext::new();
+    let sql = "select COALESCE('', 'test')";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------------+",
+        "| coalesce(Utf8(\"\"),Utf8(\"test\")) |",
+        "+---------------------------------+",
+        "|                                 |",
+        "+---------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_result() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![Some(0), None, Some(1), None, None])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                Some(1),
+                None,
+            ])),
+        ],
+    )?;
+
+    let table = MemTable::try_new(schema, vec![vec![data]])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_table("test", Arc::new(table))?;
+    let sql = "SELECT COALESCE(c1, c2) FROM test";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---------------------------+",
+        "| coalesce(test.c1,test.c2) |",
+        "+---------------------------+",
+        "| 0                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "| 1                         |",
+        "|                           |",
+        "+---------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn coalesce_static_empty_value() -> Result<()> {

Review comment:
       its an overlook, you're right. i'll remove `coalesce_plan `




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] msathis commented on pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
msathis commented on pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#issuecomment-1086681876


   @alamb Updated the PR to address the review comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] msathis commented on pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
msathis commented on pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#issuecomment-1068259918


   @andygrove @xudong963 Please take a look & review when you find time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#discussion_r827322220



##########
File path: datafusion-physical-expr/src/conditional_expressions.rs
##########
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::Array;
+use arrow::datatypes::DataType;
+
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::ColumnarValue;
+
+pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    // do not accept 0 arguments.
+    if args.is_empty() {
+        return Err(DataFusionError::Internal(format!(
+            "coalesce was called with {} arguments. It requires at least 1.",
+            args.len()
+        )));
+    }
+
+    let mut res = vec![];
+    let size = match args[0] {
+        ColumnarValue::Array(ref a) => a.len(),
+        ColumnarValue::Scalar(ref _s) => 1,
+    };
+
+    for i in 0..size {
+        let mut value = ScalarValue::try_from(&args[0].data_type())?;
+        for column_value in args {
+            match column_value {
+                ColumnarValue::Array(array_ref) => {
+                    if array_ref.is_valid(i) {

Review comment:
       This implementation can likely be optimized (I think it should be >10x faster) by not converting values to `ScalarValue` and using `is_valid`, matching per value, but operating directly on Arrays. That would also allow optimizations like skipping running the coalesce althogether if all items in the array already are non null, or skipping handling the array if every value is null, etc.
   
   This will involve some more work / code though, but maybe a comment that there are some big optimizations possible could be useful.
   
   The current implementation also could likely already be optimized a bit by moving the for loop to the innermost place instead of the outermost place and/or implementing one of the rules like mentioned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#issuecomment-1086519519


   @msathis If the pr is ready to review, please at me.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] msathis commented on pull request #1969: Coalesce function

Posted by GitBox <gi...@apache.org>.
msathis commented on pull request #1969:
URL: https://github.com/apache/arrow-datafusion/pull/1969#issuecomment-1069322462


   @yjshen Yup. Thanks for pointing out. I'll add it to Ballista with the optimization @Dandandan suggested


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org