You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/12 11:15:26 UTC
[arrow-datafusion] branch main updated: Support null values in Avro string columns (#6307)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b9e5c071ec Support null values in Avro string columns (#6307)
b9e5c071ec is described below
commit b9e5c071ece14d889e4441c49cb2ed932c526d1f
Author: Ronen Cohen <ro...@coralogix.com>
AuthorDate: Fri May 12 14:15:19 2023 +0300
Support null values in Avro string columns (#6307)
* Support null values in Avro string columns
* Temporarily change arrow-testing to fork
* Update testing submodule
* Switch back to apache/arrow-testing
* Fix arrow-testing URL
---
.../core/src/avro_to_arrow/arrow_array_reader.rs | 27 ++++---
datafusion/core/src/datasource/file_format/avro.rs | 85 ++++++++++++++++++++++
testing | 2 +-
3 files changed, 101 insertions(+), 13 deletions(-)
diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
index a4254c86ce..0b7f9ca583 100644
--- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
@@ -275,14 +275,13 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
} else if let Value::Array(n) = value {
n.iter()
.map(resolve_string)
- .collect::<ArrowResult<Vec<String>>>()?
+ .collect::<ArrowResult<Vec<Option<String>>>>()?
.into_iter()
- .map(Some)
.collect::<Vec<Option<String>>>()
} else if let Value::Null = value {
vec![None]
} else if !matches!(value, Value::Record(_)) {
- vec![Some(resolve_string(value)?)]
+ vec![resolve_string(value)?]
} else {
return Err(SchemaError(
"Only scalars are currently supported in Avro arrays".to_string(),
@@ -351,7 +350,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
self.build_string_dictionary_builder(rows.len());
for row in rows {
if let Some(value) = self.field_lookup(col_name, row) {
- if let Ok(str_v) = resolve_string(value) {
+ if let Ok(Some(str_v)) = resolve_string(value) {
builder.append(str_v).map(drop)?
} else {
builder.append_null()
@@ -689,7 +688,10 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
rows.iter()
.map(|row| {
let maybe_value = self.field_lookup(field.name(), row);
- maybe_value.map(resolve_string).transpose()
+ match maybe_value {
+ None => Ok(None),
+ Some(v) => resolve_string(v),
+ }
})
.collect::<ArrowResult<StringArray>>()?,
)
@@ -841,12 +843,12 @@ fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
if let Value::Array(values) = row {
values
.iter()
- .map(|s| resolve_string(s).ok())
+ .map(|s| resolve_string(s).ok().flatten())
.collect::<Vec<Option<_>>>()
} else if let Value::Null = row {
vec![]
} else {
- vec![resolve_string(row).ok()]
+ vec![resolve_string(row).ok().flatten()]
}
})
.collect::<Vec<Option<_>>>()
@@ -855,13 +857,14 @@ fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
/// Reads an Avro value as a string, regardless of its type.
/// This is useful if the expected datatype is a string, in which case we preserve
/// all the values regardless of they type.
-fn resolve_string(v: &Value) -> ArrowResult<String> {
+fn resolve_string(v: &Value) -> ArrowResult<Option<String>> {
let v = if let Value::Union(_, b) = v { b } else { v };
match v {
- Value::String(s) => Ok(s.clone()),
- Value::Bytes(bytes) => {
- String::from_utf8(bytes.to_vec()).map_err(AvroError::ConvertToUtf8)
- }
+ Value::String(s) => Ok(Some(s.clone())),
+ Value::Bytes(bytes) => String::from_utf8(bytes.to_vec())
+ .map_err(AvroError::ConvertToUtf8)
+ .map(Some),
+ Value::Null => Ok(None),
other => Err(AvroError::GetString(other.into())),
}
.map_err(|e| SchemaError(format!("expected resolvable string : {e:?}")))
diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs
index 4227333089..374ef18970 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -96,6 +96,7 @@ mod tests {
use crate::datasource::file_format::test_util::scan_format;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
+ use arrow::array::{as_string_array, Array};
use datafusion_common::cast::{
as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
as_int32_array, as_timestamp_microsecond_array,
@@ -221,6 +222,27 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn read_null_bool_alltypes_plain_avro() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ let projection = Some(vec![2]);
+ let exec =
+ get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
+
+ let batches = collect(exec, task_ctx).await?;
+ assert_eq!(batches.len(), 1);
+ assert_eq!(1, batches[0].num_columns());
+ assert_eq!(1, batches[0].num_rows());
+
+ let array = as_boolean_array(batches[0].column(0))?;
+
+ assert!(array.is_null(0));
+
+ Ok(())
+ }
+
#[tokio::test]
async fn read_i32_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
@@ -245,6 +267,27 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn read_null_i32_alltypes_plain_avro() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ let projection = Some(vec![1]);
+ let exec =
+ get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
+
+ let batches = collect(exec, task_ctx).await?;
+ assert_eq!(batches.len(), 1);
+ assert_eq!(1, batches[0].num_columns());
+ assert_eq!(1, batches[0].num_rows());
+
+ let array = as_int32_array(batches[0].column(0))?;
+
+ assert!(array.is_null(0));
+
+ Ok(())
+ }
+
#[tokio::test]
async fn read_i96_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
@@ -350,6 +393,48 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn read_null_binary_alltypes_plain_avro() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ let projection = Some(vec![6]);
+ let exec =
+ get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
+
+ let batches = collect(exec, task_ctx).await?;
+ assert_eq!(batches.len(), 1);
+ assert_eq!(1, batches[0].num_columns());
+ assert_eq!(1, batches[0].num_rows());
+
+ let array = as_binary_array(batches[0].column(0))?;
+
+ assert!(array.is_null(0));
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn read_null_string_alltypes_plain_avro() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ let projection = Some(vec![0]);
+ let exec =
+ get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
+
+ let batches = collect(exec, task_ctx).await?;
+ assert_eq!(batches.len(), 1);
+ assert_eq!(1, batches[0].num_columns());
+ assert_eq!(1, batches[0].num_rows());
+
+ let array = as_string_array(batches[0].column(0));
+
+ assert!(array.is_null(0));
+
+ Ok(())
+ }
+
async fn get_exec(
state: &SessionState,
file_name: &str,
diff --git a/testing b/testing
index 47f7b56b25..e81d0c6de3 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 47f7b56b25683202c1fd957668e13f2abafc0f12
+Subproject commit e81d0c6de35948b3be7984af8e00413b314cde6e