You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/10/22 10:47:22 UTC

[arrow-datafusion] branch master updated: Implement ScalarValue for FixedSizeBinary (#3911)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 998a7aa82 Implement ScalarValue for FixedSizeBinary (#3911)
998a7aa82 is described below

commit 998a7aa8296122cda3a06a8bf7a61c033c88d36b
Author: Max Burke <ma...@urbanlogiq.com>
AuthorDate: Sat Oct 22 03:47:16 2022 -0700

    Implement ScalarValue for FixedSizeBinary (#3911)
    
    * implement ScalarValue for FixedSizeBinary
    
    * Return an error instead of panicking
    
    * add tests for scalar value fixedsizebinary variant
---
 datafusion/common/src/scalar.rs                   |  79 +++++++++++++++++++++-
 datafusion/core/tests/parquet/test_binary.parquet | Bin 0 -> 8558 bytes
 datafusion/core/tests/sql/parquet.rs              |  22 ++++++
 datafusion/proto/src/to_proto.rs                  |   5 ++
 4 files changed, 105 insertions(+), 1 deletion(-)

diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 2b4b98964..0afb28c6b 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -77,6 +77,8 @@ pub enum ScalarValue {
     LargeUtf8(Option<String>),
     /// binary
     Binary(Option<Vec<u8>>),
+    /// fixed size binary
+    FixedSizeBinary(i32, Option<Vec<u8>>),
     /// large binary
     LargeBinary(Option<Vec<u8>>),
     /// list of nested ScalarValue
@@ -159,6 +161,8 @@ impl PartialEq for ScalarValue {
             (LargeUtf8(_), _) => false,
             (Binary(v1), Binary(v2)) => v1.eq(v2),
             (Binary(_), _) => false,
+            (FixedSizeBinary(_, v1), FixedSizeBinary(_, v2)) => v1.eq(v2),
+            (FixedSizeBinary(_, _), _) => false,
             (LargeBinary(v1), LargeBinary(v2)) => v1.eq(v2),
             (LargeBinary(_), _) => false,
             (List(v1, t1), List(v2, t2)) => v1.eq(v2) && t1.eq(t2),
@@ -247,6 +251,8 @@ impl PartialOrd for ScalarValue {
             (LargeUtf8(_), _) => None,
             (Binary(v1), Binary(v2)) => v1.partial_cmp(v2),
             (Binary(_), _) => None,
+            (FixedSizeBinary(_, v1), FixedSizeBinary(_, v2)) => v1.partial_cmp(v2),
+            (FixedSizeBinary(_, _), _) => None,
             (LargeBinary(v1), LargeBinary(v2)) => v1.partial_cmp(v2),
             (LargeBinary(_), _) => None,
             (List(v1, t1), List(v2, t2)) => {
@@ -536,6 +542,7 @@ impl std::hash::Hash for ScalarValue {
             Utf8(v) => v.hash(state),
             LargeUtf8(v) => v.hash(state),
             Binary(v) => v.hash(state),
+            FixedSizeBinary(_, v) => v.hash(state),
             LargeBinary(v) => v.hash(state),
             List(v, t) => {
                 v.hash(state);
@@ -900,6 +907,7 @@ impl ScalarValue {
             ScalarValue::Utf8(_) => DataType::Utf8,
             ScalarValue::LargeUtf8(_) => DataType::LargeUtf8,
             ScalarValue::Binary(_) => DataType::Binary,
+            ScalarValue::FixedSizeBinary(sz, _) => DataType::FixedSizeBinary(*sz),
             ScalarValue::LargeBinary(_) => DataType::LargeBinary,
             ScalarValue::List(_, field) => DataType::List(Box::new(Field::new(
                 "item",
@@ -987,6 +995,7 @@ impl ScalarValue {
             ScalarValue::Utf8(v) => v.is_none(),
             ScalarValue::LargeUtf8(v) => v.is_none(),
             ScalarValue::Binary(v) => v.is_none(),
+            ScalarValue::FixedSizeBinary(_, v) => v.is_none(),
             ScalarValue::LargeBinary(v) => v.is_none(),
             ScalarValue::List(v, _) => v.is_none(),
             ScalarValue::Date32(v) => v.is_none(),
@@ -1393,13 +1402,30 @@ impl ScalarValue {
                     _ => unreachable!("Invalid dictionary keys type: {:?}", key_type),
                 }
             }
+            DataType::FixedSizeBinary(_) => {
+                let array = scalars
+                    .map(|sv| {
+                        if let ScalarValue::FixedSizeBinary(_, v) = sv {
+                            Ok(v)
+                        } else {
+                            Err(DataFusionError::Internal(format!(
+                                "Inconsistent types in ScalarValue::iter_to_array. \
+                                Expected {:?}, got {:?}",
+                                data_type, sv
+                            )))
+                        }
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                let array =
+                    FixedSizeBinaryArray::try_from_sparse_iter(array.into_iter())?;
+                Arc::new(array)
+            }
             // explicitly enumerate unsupported types so newly added
             // types must be aknowledged
             DataType::Float16
             | DataType::Time32(_)
             | DataType::Time64(_)
             | DataType::Duration(_)
-            | DataType::FixedSizeBinary(_)
             | DataType::FixedSizeList(_, _)
             | DataType::Interval(_)
             | DataType::LargeList(_)
@@ -1602,6 +1628,20 @@ impl ScalarValue {
                     Arc::new(repeat(None::<&str>).take(size).collect::<BinaryArray>())
                 }
             },
+            ScalarValue::FixedSizeBinary(_, e) => match e {
+                Some(value) => Arc::new(
+                    FixedSizeBinaryArray::try_from_sparse_iter(
+                        repeat(Some(value.as_slice())).take(size),
+                    )
+                    .unwrap(),
+                ),
+                None => Arc::new(
+                    FixedSizeBinaryArray::try_from_sparse_iter(
+                        repeat(None::<&[u8]>).take(size),
+                    )
+                    .unwrap(),
+                ),
+            },
             ScalarValue::LargeBinary(e) => match e {
                 Some(value) => Arc::new(
                     repeat(Some(value.as_slice()))
@@ -1887,6 +1927,23 @@ impl ScalarValue {
                 };
                 ScalarValue::new_list(value, nested_type.data_type().clone())
             }
+            DataType::FixedSizeBinary(_) => {
+                let array = array
+                    .as_any()
+                    .downcast_ref::<FixedSizeBinaryArray>()
+                    .unwrap();
+                let size = match array.data_type() {
+                    DataType::FixedSizeBinary(size) => *size,
+                    _ => unreachable!(),
+                };
+                ScalarValue::FixedSizeBinary(
+                    size,
+                    match array.is_null(index) {
+                        true => None,
+                        false => Some(array.value(index).into()),
+                    },
+                )
+            }
             other => {
                 return Err(DataFusionError::NotImplemented(format!(
                     "Can't create a scalar from array of type \"{:?}\"",
@@ -1973,6 +2030,9 @@ impl ScalarValue {
             ScalarValue::Binary(val) => {
                 eq_array_primitive!(array, index, BinaryArray, val)
             }
+            ScalarValue::FixedSizeBinary(_, val) => {
+                eq_array_primitive!(array, index, FixedSizeBinaryArray, val)
+            }
             ScalarValue::LargeBinary(val) => {
                 eq_array_primitive!(array, index, LargeBinaryArray, val)
             }
@@ -2317,6 +2377,17 @@ impl fmt::Display for ScalarValue {
                 )?,
                 None => write!(f, "NULL")?,
             },
+            ScalarValue::FixedSizeBinary(_, e) => match e {
+                Some(l) => write!(
+                    f,
+                    "{}",
+                    l.iter()
+                        .map(|v| format!("{}", v))
+                        .collect::<Vec<_>>()
+                        .join(",")
+                )?,
+                None => write!(f, "NULL")?,
+            },
             ScalarValue::LargeBinary(e) => match e {
                 Some(l) => write!(
                     f,
@@ -2397,6 +2468,12 @@ impl fmt::Debug for ScalarValue {
             ScalarValue::LargeUtf8(Some(_)) => write!(f, "LargeUtf8(\"{}\")", self),
             ScalarValue::Binary(None) => write!(f, "Binary({})", self),
             ScalarValue::Binary(Some(_)) => write!(f, "Binary(\"{}\")", self),
+            ScalarValue::FixedSizeBinary(size, None) => {
+                write!(f, "FixedSizeBinary({}, {})", size, self)
+            }
+            ScalarValue::FixedSizeBinary(size, Some(_)) => {
+                write!(f, "FixedSizeBinary({}, \"{}\")", size, self)
+            }
             ScalarValue::LargeBinary(None) => write!(f, "LargeBinary({})", self),
             ScalarValue::LargeBinary(Some(_)) => write!(f, "LargeBinary(\"{}\")", self),
             ScalarValue::List(_, _) => write!(f, "List([{}])", self),
diff --git a/datafusion/core/tests/parquet/test_binary.parquet b/datafusion/core/tests/parquet/test_binary.parquet
new file mode 100644
index 000000000..9d906bc49
Binary files /dev/null and b/datafusion/core/tests/parquet/test_binary.parquet differ
diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs
index 8bec4f1dd..c70466737 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -48,6 +48,28 @@ async fn parquet_query() {
     assert_batches_eq!(expected, &actual);
 }
 
+#[tokio::test]
+async fn fixed_size_binary_columns() {
+    let ctx = SessionContext::new();
+    ctx.register_parquet(
+        "t0",
+        "tests/parquet/test_binary.parquet",
+        ParquetReadOptions::default(),
+    )
+    .await
+    .unwrap();
+    let sql = "SELECT ids FROM t0 ORDER BY ids";
+    let plan = ctx.create_logical_plan(sql).unwrap();
+    let plan = ctx.optimize(&plan).unwrap();
+    let plan = ctx.create_physical_plan(&plan).await.unwrap();
+    let task_ctx = ctx.task_ctx();
+    let results = collect(plan, task_ctx).await.unwrap();
+    for batch in results {
+        assert_eq!(466, batch.num_rows());
+        assert_eq!(1, batch.num_columns());
+    }
+}
+
 #[tokio::test]
 async fn parquet_single_nan_schema() {
     let ctx = SessionContext::new();
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index e2874da1f..f8dab779b 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -1063,6 +1063,11 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
                     Value::LargeBinaryValue(s.to_owned())
                 })
             }
+            scalar::ScalarValue::FixedSizeBinary(_, _) => {
+                return Err(Error::General(
+                    "FixedSizeBinary is not yet implemented".to_owned(),
+                ))
+            }
 
             datafusion::scalar::ScalarValue::Time64(v) => {
                 create_proto_scalar(v, PrimitiveScalarType::Time64, |v| {