You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/12 11:15:26 UTC

[arrow-datafusion] branch main updated: Support null values in Avro string columns (#6307)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b9e5c071ec Support null values in Avro string columns (#6307)
b9e5c071ec is described below

commit b9e5c071ece14d889e4441c49cb2ed932c526d1f
Author: Ronen Cohen <ro...@coralogix.com>
AuthorDate: Fri May 12 14:15:19 2023 +0300

    Support null values in Avro string columns (#6307)
    
    * Support null values in Avro string columns
    
    * Temporarily change arrow-testing to fork
    
    * Update testing submodule
    
    * Switch back to apache/arrow-testing
    
    * Fix arrow-testing URL
---
 .../core/src/avro_to_arrow/arrow_array_reader.rs   | 27 ++++---
 datafusion/core/src/datasource/file_format/avro.rs | 85 ++++++++++++++++++++++
 testing                                            |  2 +-
 3 files changed, 101 insertions(+), 13 deletions(-)

diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
index a4254c86ce..0b7f9ca583 100644
--- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
@@ -275,14 +275,13 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                 } else if let Value::Array(n) = value {
                     n.iter()
                         .map(resolve_string)
-                        .collect::<ArrowResult<Vec<String>>>()?
+                        .collect::<ArrowResult<Vec<Option<String>>>>()?
                         .into_iter()
-                        .map(Some)
                         .collect::<Vec<Option<String>>>()
                 } else if let Value::Null = value {
                     vec![None]
                 } else if !matches!(value, Value::Record(_)) {
-                    vec![Some(resolve_string(value)?)]
+                    vec![resolve_string(value)?]
                 } else {
                     return Err(SchemaError(
                         "Only scalars are currently supported in Avro arrays".to_string(),
@@ -351,7 +350,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
             self.build_string_dictionary_builder(rows.len());
         for row in rows {
             if let Some(value) = self.field_lookup(col_name, row) {
-                if let Ok(str_v) = resolve_string(value) {
+                if let Ok(Some(str_v)) = resolve_string(value) {
                     builder.append(str_v).map(drop)?
                 } else {
                     builder.append_null()
@@ -689,7 +688,10 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                         rows.iter()
                             .map(|row| {
                                 let maybe_value = self.field_lookup(field.name(), row);
-                                maybe_value.map(resolve_string).transpose()
+                                match maybe_value {
+                                    None => Ok(None),
+                                    Some(v) => resolve_string(v),
+                                }
                             })
                             .collect::<ArrowResult<StringArray>>()?,
                     )
@@ -841,12 +843,12 @@ fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
             if let Value::Array(values) = row {
                 values
                     .iter()
-                    .map(|s| resolve_string(s).ok())
+                    .map(|s| resolve_string(s).ok().flatten())
                     .collect::<Vec<Option<_>>>()
             } else if let Value::Null = row {
                 vec![]
             } else {
-                vec![resolve_string(row).ok()]
+                vec![resolve_string(row).ok().flatten()]
             }
         })
         .collect::<Vec<Option<_>>>()
@@ -855,13 +857,14 @@ fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
 /// Reads an Avro value as a string, regardless of its type.
 /// This is useful if the expected datatype is a string, in which case we preserve
 /// all the values regardless of they type.
-fn resolve_string(v: &Value) -> ArrowResult<String> {
+fn resolve_string(v: &Value) -> ArrowResult<Option<String>> {
     let v = if let Value::Union(_, b) = v { b } else { v };
     match v {
-        Value::String(s) => Ok(s.clone()),
-        Value::Bytes(bytes) => {
-            String::from_utf8(bytes.to_vec()).map_err(AvroError::ConvertToUtf8)
-        }
+        Value::String(s) => Ok(Some(s.clone())),
+        Value::Bytes(bytes) => String::from_utf8(bytes.to_vec())
+            .map_err(AvroError::ConvertToUtf8)
+            .map(Some),
+        Value::Null => Ok(None),
         other => Err(AvroError::GetString(other.into())),
     }
     .map_err(|e| SchemaError(format!("expected resolvable string : {e:?}")))
diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs
index 4227333089..374ef18970 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -96,6 +96,7 @@ mod tests {
     use crate::datasource::file_format::test_util::scan_format;
     use crate::physical_plan::collect;
     use crate::prelude::{SessionConfig, SessionContext};
+    use arrow::array::{as_string_array, Array};
     use datafusion_common::cast::{
         as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
         as_int32_array, as_timestamp_microsecond_array,
@@ -221,6 +222,27 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn read_null_bool_alltypes_plain_avro() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let state = session_ctx.state();
+        let task_ctx = state.task_ctx();
+        let projection = Some(vec![2]);
+        let exec =
+            get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
+
+        let batches = collect(exec, task_ctx).await?;
+        assert_eq!(batches.len(), 1);
+        assert_eq!(1, batches[0].num_columns());
+        assert_eq!(1, batches[0].num_rows());
+
+        let array = as_boolean_array(batches[0].column(0))?;
+
+        assert!(array.is_null(0));
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn read_i32_alltypes_plain_avro() -> Result<()> {
         let session_ctx = SessionContext::new();
@@ -245,6 +267,27 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn read_null_i32_alltypes_plain_avro() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let state = session_ctx.state();
+        let task_ctx = state.task_ctx();
+        let projection = Some(vec![1]);
+        let exec =
+            get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
+
+        let batches = collect(exec, task_ctx).await?;
+        assert_eq!(batches.len(), 1);
+        assert_eq!(1, batches[0].num_columns());
+        assert_eq!(1, batches[0].num_rows());
+
+        let array = as_int32_array(batches[0].column(0))?;
+
+        assert!(array.is_null(0));
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn read_i96_alltypes_plain_avro() -> Result<()> {
         let session_ctx = SessionContext::new();
@@ -350,6 +393,48 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn read_null_binary_alltypes_plain_avro() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let state = session_ctx.state();
+        let task_ctx = state.task_ctx();
+        let projection = Some(vec![6]);
+        let exec =
+            get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
+
+        let batches = collect(exec, task_ctx).await?;
+        assert_eq!(batches.len(), 1);
+        assert_eq!(1, batches[0].num_columns());
+        assert_eq!(1, batches[0].num_rows());
+
+        let array = as_binary_array(batches[0].column(0))?;
+
+        assert!(array.is_null(0));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn read_null_string_alltypes_plain_avro() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let state = session_ctx.state();
+        let task_ctx = state.task_ctx();
+        let projection = Some(vec![0]);
+        let exec =
+            get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
+
+        let batches = collect(exec, task_ctx).await?;
+        assert_eq!(batches.len(), 1);
+        assert_eq!(1, batches[0].num_columns());
+        assert_eq!(1, batches[0].num_rows());
+
+        let array = as_string_array(batches[0].column(0));
+
+        assert!(array.is_null(0));
+
+        Ok(())
+    }
+
     async fn get_exec(
         state: &SessionState,
         file_name: &str,
diff --git a/testing b/testing
index 47f7b56b25..e81d0c6de3 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 47f7b56b25683202c1fd957668e13f2abafc0f12
+Subproject commit e81d0c6de35948b3be7984af8e00413b314cde6e