You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/10/22 10:47:22 UTC
[arrow-datafusion] branch master updated: Implement ScalarValue for FixedSizeBinary (#3911)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 998a7aa82 Implement ScalarValue for FixedSizeBinary (#3911)
998a7aa82 is described below
commit 998a7aa8296122cda3a06a8bf7a61c033c88d36b
Author: Max Burke <ma...@urbanlogiq.com>
AuthorDate: Sat Oct 22 03:47:16 2022 -0700
Implement ScalarValue for FixedSizeBinary (#3911)
* implement ScalarValue for FixedSizeBinary
* Return an error instead of panicking
* add tests for scalar value fixedsizebinary variant
---
datafusion/common/src/scalar.rs | 79 +++++++++++++++++++++-
datafusion/core/tests/parquet/test_binary.parquet | Bin 0 -> 8558 bytes
datafusion/core/tests/sql/parquet.rs | 22 ++++++
datafusion/proto/src/to_proto.rs | 5 ++
4 files changed, 105 insertions(+), 1 deletion(-)
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 2b4b98964..0afb28c6b 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -77,6 +77,8 @@ pub enum ScalarValue {
LargeUtf8(Option<String>),
/// binary
Binary(Option<Vec<u8>>),
+ /// fixed size binary
+ FixedSizeBinary(i32, Option<Vec<u8>>),
/// large binary
LargeBinary(Option<Vec<u8>>),
/// list of nested ScalarValue
@@ -159,6 +161,8 @@ impl PartialEq for ScalarValue {
(LargeUtf8(_), _) => false,
(Binary(v1), Binary(v2)) => v1.eq(v2),
(Binary(_), _) => false,
+ (FixedSizeBinary(_, v1), FixedSizeBinary(_, v2)) => v1.eq(v2),
+ (FixedSizeBinary(_, _), _) => false,
(LargeBinary(v1), LargeBinary(v2)) => v1.eq(v2),
(LargeBinary(_), _) => false,
(List(v1, t1), List(v2, t2)) => v1.eq(v2) && t1.eq(t2),
@@ -247,6 +251,8 @@ impl PartialOrd for ScalarValue {
(LargeUtf8(_), _) => None,
(Binary(v1), Binary(v2)) => v1.partial_cmp(v2),
(Binary(_), _) => None,
+ (FixedSizeBinary(_, v1), FixedSizeBinary(_, v2)) => v1.partial_cmp(v2),
+ (FixedSizeBinary(_, _), _) => None,
(LargeBinary(v1), LargeBinary(v2)) => v1.partial_cmp(v2),
(LargeBinary(_), _) => None,
(List(v1, t1), List(v2, t2)) => {
@@ -536,6 +542,7 @@ impl std::hash::Hash for ScalarValue {
Utf8(v) => v.hash(state),
LargeUtf8(v) => v.hash(state),
Binary(v) => v.hash(state),
+ FixedSizeBinary(_, v) => v.hash(state),
LargeBinary(v) => v.hash(state),
List(v, t) => {
v.hash(state);
@@ -900,6 +907,7 @@ impl ScalarValue {
ScalarValue::Utf8(_) => DataType::Utf8,
ScalarValue::LargeUtf8(_) => DataType::LargeUtf8,
ScalarValue::Binary(_) => DataType::Binary,
+ ScalarValue::FixedSizeBinary(sz, _) => DataType::FixedSizeBinary(*sz),
ScalarValue::LargeBinary(_) => DataType::LargeBinary,
ScalarValue::List(_, field) => DataType::List(Box::new(Field::new(
"item",
@@ -987,6 +995,7 @@ impl ScalarValue {
ScalarValue::Utf8(v) => v.is_none(),
ScalarValue::LargeUtf8(v) => v.is_none(),
ScalarValue::Binary(v) => v.is_none(),
+ ScalarValue::FixedSizeBinary(_, v) => v.is_none(),
ScalarValue::LargeBinary(v) => v.is_none(),
ScalarValue::List(v, _) => v.is_none(),
ScalarValue::Date32(v) => v.is_none(),
@@ -1393,13 +1402,30 @@ impl ScalarValue {
_ => unreachable!("Invalid dictionary keys type: {:?}", key_type),
}
}
+ DataType::FixedSizeBinary(_) => {
+ let array = scalars
+ .map(|sv| {
+ if let ScalarValue::FixedSizeBinary(_, v) = sv {
+ Ok(v)
+ } else {
+ Err(DataFusionError::Internal(format!(
+ "Inconsistent types in ScalarValue::iter_to_array. \
+ Expected {:?}, got {:?}",
+ data_type, sv
+ )))
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let array =
+ FixedSizeBinaryArray::try_from_sparse_iter(array.into_iter())?;
+ Arc::new(array)
+ }
// explicitly enumerate unsupported types so newly added
// types must be aknowledged
DataType::Float16
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_)
- | DataType::FixedSizeBinary(_)
| DataType::FixedSizeList(_, _)
| DataType::Interval(_)
| DataType::LargeList(_)
@@ -1602,6 +1628,20 @@ impl ScalarValue {
Arc::new(repeat(None::<&str>).take(size).collect::<BinaryArray>())
}
},
+ ScalarValue::FixedSizeBinary(_, e) => match e {
+ Some(value) => Arc::new(
+ FixedSizeBinaryArray::try_from_sparse_iter(
+ repeat(Some(value.as_slice())).take(size),
+ )
+ .unwrap(),
+ ),
+ None => Arc::new(
+ FixedSizeBinaryArray::try_from_sparse_iter(
+ repeat(None::<&[u8]>).take(size),
+ )
+ .unwrap(),
+ ),
+ },
ScalarValue::LargeBinary(e) => match e {
Some(value) => Arc::new(
repeat(Some(value.as_slice()))
@@ -1887,6 +1927,23 @@ impl ScalarValue {
};
ScalarValue::new_list(value, nested_type.data_type().clone())
}
+ DataType::FixedSizeBinary(_) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .unwrap();
+ let size = match array.data_type() {
+ DataType::FixedSizeBinary(size) => *size,
+ _ => unreachable!(),
+ };
+ ScalarValue::FixedSizeBinary(
+ size,
+ match array.is_null(index) {
+ true => None,
+ false => Some(array.value(index).into()),
+ },
+ )
+ }
other => {
return Err(DataFusionError::NotImplemented(format!(
"Can't create a scalar from array of type \"{:?}\"",
@@ -1973,6 +2030,9 @@ impl ScalarValue {
ScalarValue::Binary(val) => {
eq_array_primitive!(array, index, BinaryArray, val)
}
+ ScalarValue::FixedSizeBinary(_, val) => {
+ eq_array_primitive!(array, index, FixedSizeBinaryArray, val)
+ }
ScalarValue::LargeBinary(val) => {
eq_array_primitive!(array, index, LargeBinaryArray, val)
}
@@ -2317,6 +2377,17 @@ impl fmt::Display for ScalarValue {
)?,
None => write!(f, "NULL")?,
},
+ ScalarValue::FixedSizeBinary(_, e) => match e {
+ Some(l) => write!(
+ f,
+ "{}",
+ l.iter()
+ .map(|v| format!("{}", v))
+ .collect::<Vec<_>>()
+ .join(",")
+ )?,
+ None => write!(f, "NULL")?,
+ },
ScalarValue::LargeBinary(e) => match e {
Some(l) => write!(
f,
@@ -2397,6 +2468,12 @@ impl fmt::Debug for ScalarValue {
ScalarValue::LargeUtf8(Some(_)) => write!(f, "LargeUtf8(\"{}\")", self),
ScalarValue::Binary(None) => write!(f, "Binary({})", self),
ScalarValue::Binary(Some(_)) => write!(f, "Binary(\"{}\")", self),
+ ScalarValue::FixedSizeBinary(size, None) => {
+ write!(f, "FixedSizeBinary({}, {})", size, self)
+ }
+ ScalarValue::FixedSizeBinary(size, Some(_)) => {
+ write!(f, "FixedSizeBinary({}, \"{}\")", size, self)
+ }
ScalarValue::LargeBinary(None) => write!(f, "LargeBinary({})", self),
ScalarValue::LargeBinary(Some(_)) => write!(f, "LargeBinary(\"{}\")", self),
ScalarValue::List(_, _) => write!(f, "List([{}])", self),
diff --git a/datafusion/core/tests/parquet/test_binary.parquet b/datafusion/core/tests/parquet/test_binary.parquet
new file mode 100644
index 000000000..9d906bc49
Binary files /dev/null and b/datafusion/core/tests/parquet/test_binary.parquet differ
diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs
index 8bec4f1dd..c70466737 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -48,6 +48,28 @@ async fn parquet_query() {
assert_batches_eq!(expected, &actual);
}
+#[tokio::test]
+async fn fixed_size_binary_columns() {
+ let ctx = SessionContext::new();
+ ctx.register_parquet(
+ "t0",
+ "tests/parquet/test_binary.parquet",
+ ParquetReadOptions::default(),
+ )
+ .await
+ .unwrap();
+ let sql = "SELECT ids FROM t0 ORDER BY ids";
+ let plan = ctx.create_logical_plan(sql).unwrap();
+ let plan = ctx.optimize(&plan).unwrap();
+ let plan = ctx.create_physical_plan(&plan).await.unwrap();
+ let task_ctx = ctx.task_ctx();
+ let results = collect(plan, task_ctx).await.unwrap();
+ for batch in results {
+ assert_eq!(466, batch.num_rows());
+ assert_eq!(1, batch.num_columns());
+ }
+}
+
#[tokio::test]
async fn parquet_single_nan_schema() {
let ctx = SessionContext::new();
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index e2874da1f..f8dab779b 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -1063,6 +1063,11 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
Value::LargeBinaryValue(s.to_owned())
})
}
+ scalar::ScalarValue::FixedSizeBinary(_, _) => {
+ return Err(Error::General(
+ "FixedSizeBinary is not yet implemented".to_owned(),
+ ))
+ }
datafusion::scalar::ScalarValue::Time64(v) => {
create_proto_scalar(v, PrimitiveScalarType::Time64, |v| {