You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/07/18 17:28:38 UTC
[arrow-datafusion] branch master updated: Preserve field name in `ScalarValue::List` (#2893)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 90e5fd048 Preserve field name in `ScalarValue::List` (#2893)
90e5fd048 is described below
commit 90e5fd0480ca30d19d1b638e050e3a4da60df9cc
Author: comphead <co...@users.noreply.github.com>
AuthorDate: Mon Jul 18 10:28:33 2022 -0700
Preserve field name in `ScalarValue::List` (#2893)
* scalar list data type
* list element name
* remove unused code
* remove unused logging
* Reenabled commented test
* Formatting
* merge
* Update datafusion/common/src/scalar.rs
* Update datafusion/proto/src/to_proto.rs
* Update datafusion/proto/src/to_proto.rs
* fix fmt
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion/common/src/scalar.rs | 101 ++++++++------
.../physical-expr/src/aggregate/array_agg.rs | 38 ++---
.../src/aggregate/array_agg_distinct.rs | 40 +++---
.../physical-expr/src/aggregate/count_distinct.rs | 8 +-
.../physical-expr/src/aggregate/sum_distinct.rs | 6 +-
datafusion/physical-expr/src/aggregate/tdigest.rs | 9 +-
datafusion/proto/proto/datafusion.proto | 2 +-
datafusion/proto/src/from_proto.rs | 155 +--------------------
datafusion/proto/src/lib.rs | 120 ++++++++++------
datafusion/proto/src/to_proto.rs | 31 +++--
datafusion/sql/src/planner.rs | 4 +-
11 files changed, 219 insertions(+), 295 deletions(-)
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index ba02aeb67..633a887e2 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -77,7 +77,7 @@ pub enum ScalarValue {
/// large binary
LargeBinary(Option<Vec<u8>>),
/// list of nested ScalarValue
- List(Option<Vec<ScalarValue>>, Box<DataType>),
+ List(Option<Vec<ScalarValue>>, Box<Field>),
/// Date stored as a signed 32bit int days since UNIX epoch 1970-01-01
Date32(Option<i32>),
/// Date stored as a signed 64bit int milliseconds since UNIX epoch 1970-01-01
@@ -651,9 +651,9 @@ impl ScalarValue {
ScalarValue::LargeUtf8(_) => DataType::LargeUtf8,
ScalarValue::Binary(_) => DataType::Binary,
ScalarValue::LargeBinary(_) => DataType::LargeBinary,
- ScalarValue::List(_, data_type) => DataType::List(Box::new(Field::new(
+ ScalarValue::List(_, field) => DataType::List(Box::new(Field::new(
"item",
- data_type.as_ref().clone(),
+ field.data_type().clone(),
true,
))),
ScalarValue::Date32(_) => DataType::Date32,
@@ -1300,7 +1300,7 @@ impl ScalarValue {
.collect::<LargeBinaryArray>(),
),
},
- ScalarValue::List(values, data_type) => Arc::new(match data_type.as_ref() {
+ ScalarValue::List(values, field) => Arc::new(match field.data_type() {
DataType::Boolean => build_list!(BooleanBuilder, Boolean, values, size),
DataType::Int8 => build_list!(Int8Builder, Int8, values, size),
DataType::Int16 => build_list!(Int16Builder, Int16, values, size),
@@ -1323,7 +1323,7 @@ impl ScalarValue {
repeat(self.clone()).take(size),
&DataType::List(Box::new(Field::new(
"item",
- data_type.as_ref().clone(),
+ field.data_type().clone(),
true,
))),
)
@@ -1463,8 +1463,10 @@ impl ScalarValue {
Some(scalar_vec)
}
};
- let data_type = nested_type.data_type().clone();
- ScalarValue::List(value, Box::new(data_type))
+ ScalarValue::List(
+ value,
+ Box::new(Field::new("item", nested_type.data_type().clone(), true)),
+ )
}
DataType::Date32 => {
typed_cast!(array, index, Date32Array, Date32)
@@ -1564,8 +1566,10 @@ impl ScalarValue {
Some(scalar_vec)
}
};
- let data_type = nested_type.data_type().clone();
- ScalarValue::List(value, Box::new(data_type))
+ ScalarValue::List(
+ value,
+ Box::new(Field::new("item", nested_type.data_type().clone(), true)),
+ )
}
other => {
return Err(DataFusionError::NotImplemented(format!(
@@ -1898,9 +1902,10 @@ impl TryFrom<&DataType> for ScalarValue {
index_type.clone(),
Box::new(value_type.as_ref().try_into()?),
),
- DataType::List(ref nested_type) => {
- ScalarValue::List(None, Box::new(nested_type.data_type().clone()))
- }
+ DataType::List(ref nested_type) => ScalarValue::List(
+ None,
+ Box::new(Field::new("item", nested_type.data_type().clone(), true)),
+ ),
DataType::Struct(fields) => {
ScalarValue::Struct(None, Box::new(fields.clone()))
}
@@ -2248,8 +2253,11 @@ mod tests {
#[test]
fn scalar_list_null_to_array() {
- let list_array_ref =
- ScalarValue::List(None, Box::new(DataType::UInt64)).to_array();
+ let list_array_ref = ScalarValue::List(
+ None,
+ Box::new(Field::new("item", DataType::UInt64, false)),
+ )
+ .to_array();
let list_array = list_array_ref.as_any().downcast_ref::<ListArray>().unwrap();
assert!(list_array.is_null(0));
@@ -2265,7 +2273,7 @@ mod tests {
ScalarValue::UInt64(None),
ScalarValue::UInt64(Some(101)),
]),
- Box::new(DataType::UInt64),
+ Box::new(Field::new("item", DataType::UInt64, false)),
)
.to_array();
@@ -2747,11 +2755,11 @@ mod tests {
assert_eq!(
List(
Some(vec![Int32(Some(1)), Int32(Some(5))]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, false)),
)
.partial_cmp(&List(
Some(vec![Int32(Some(1)), Int32(Some(5))]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, false)),
)),
Some(Ordering::Equal)
);
@@ -2759,11 +2767,11 @@ mod tests {
assert_eq!(
List(
Some(vec![Int32(Some(10)), Int32(Some(5))]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, false)),
)
.partial_cmp(&List(
Some(vec![Int32(Some(1)), Int32(Some(5))]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, false)),
)),
Some(Ordering::Greater)
);
@@ -2771,11 +2779,11 @@ mod tests {
assert_eq!(
List(
Some(vec![Int32(Some(1)), Int32(Some(5))]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, false)),
)
.partial_cmp(&List(
Some(vec![Int32(Some(10)), Int32(Some(5))]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, false)),
)),
Some(Ordering::Less)
);
@@ -2784,11 +2792,11 @@ mod tests {
assert_eq!(
List(
Some(vec![Int64(Some(1)), Int64(Some(5))]),
- Box::new(DataType::Int64),
+ Box::new(Field::new("item", DataType::Int64, false)),
)
.partial_cmp(&List(
Some(vec![Int32(Some(1)), Int32(Some(5))]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, false)),
)),
None
);
@@ -3011,17 +3019,17 @@ mod tests {
ScalarValue::from(2i32),
ScalarValue::from(3i32),
]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, false)),
);
let l1 = ScalarValue::List(
Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, false)),
);
let l2 = ScalarValue::List(
Some(vec![ScalarValue::from(6i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, false)),
);
// Define struct scalars
@@ -3066,13 +3074,18 @@ mod tests {
// Define list-of-structs scalars
let nl0 = ScalarValue::List(
Some(vec![s0.clone(), s1.clone()]),
- Box::new(s0.get_datatype()),
+ Box::new(Field::new("item", s0.get_datatype(), true)),
);
- let nl1 = ScalarValue::List(Some(vec![s2]), Box::new(s0.get_datatype()));
-
- let nl2 = ScalarValue::List(Some(vec![s1]), Box::new(s0.get_datatype()));
+ let nl1 = ScalarValue::List(
+ Some(vec![s2]),
+ Box::new(Field::new("item", s0.get_datatype(), true)),
+ );
+ let nl2 = ScalarValue::List(
+ Some(vec![s1]),
+ Box::new(Field::new("item", s0.get_datatype(), true)),
+ );
// iter_to_array for list-of-struct
let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
let array = array.as_any().downcast_ref::<ListArray>().unwrap();
@@ -3222,48 +3235,48 @@ mod tests {
ScalarValue::from(2i32),
ScalarValue::from(3i32),
]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
ScalarValue::List(
Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
]),
- Box::new(DataType::List(Box::new(Field::new(
+ Box::new(Field::new(
"item",
- DataType::Int32,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
- )))),
+ )),
);
let l2 = ScalarValue::List(
Some(vec![
ScalarValue::List(
Some(vec![ScalarValue::from(6i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
ScalarValue::List(
Some(vec![ScalarValue::from(7i32), ScalarValue::from(8i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
]),
- Box::new(DataType::List(Box::new(Field::new(
+ Box::new(Field::new(
"item",
- DataType::Int32,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
- )))),
+ )),
);
let l3 = ScalarValue::List(
Some(vec![ScalarValue::List(
Some(vec![ScalarValue::from(9i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
)]),
- Box::new(DataType::List(Box::new(Field::new(
+ Box::new(Field::new(
"item",
- DataType::Int32,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
- )))),
+ )),
);
let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap();
diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs
index 2a40c8bad..eaed89390 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg.rs
@@ -150,7 +150,7 @@ impl Accumulator for ArrayAggAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::List(
Some(self.values.clone()),
- Box::new(self.datatype.clone()),
+ Box::new(Field::new("item", self.datatype.clone(), true)),
))
}
}
@@ -179,7 +179,7 @@ mod tests {
ScalarValue::Int32(Some(4)),
ScalarValue::Int32(Some(5)),
]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
);
generic_test_op!(a, DataType::Int32, ArrayAgg, list, DataType::Int32)
@@ -195,57 +195,57 @@ mod tests {
ScalarValue::from(2i32),
ScalarValue::from(3i32),
]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
ScalarValue::List(
Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
]),
- Box::new(DataType::List(Box::new(Field::new(
+ Box::new(Field::new(
"item",
- DataType::Int32,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
- )))),
+ )),
);
let l2 = ScalarValue::List(
Some(vec![
ScalarValue::List(
Some(vec![ScalarValue::from(6i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
ScalarValue::List(
Some(vec![ScalarValue::from(7i32), ScalarValue::from(8i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
]),
- Box::new(DataType::List(Box::new(Field::new(
+ Box::new(Field::new(
"item",
- DataType::Int32,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
- )))),
+ )),
);
let l3 = ScalarValue::List(
Some(vec![ScalarValue::List(
Some(vec![ScalarValue::from(9i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
)]),
- Box::new(DataType::List(Box::new(Field::new(
+ Box::new(Field::new(
"item",
- DataType::Int32,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
- )))),
+ )),
);
let list = ScalarValue::List(
Some(vec![l1.clone(), l2.clone(), l3.clone()]),
- Box::new(DataType::List(Box::new(Field::new(
+ Box::new(Field::new(
"item",
- DataType::Int32,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
- )))),
+ )),
);
let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap();
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
index 9448683c0..44e24e93c 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
@@ -122,7 +122,7 @@ impl Accumulator for DistinctArrayAggAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![ScalarValue::List(
Some(self.values.clone().into_iter().collect()),
- Box::new(self.datatype.clone()),
+ Box::new(Field::new("item", self.datatype.clone(), true)),
)])
}
@@ -153,7 +153,7 @@ impl Accumulator for DistinctArrayAggAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::List(
Some(self.values.clone().into_iter().collect()),
- Box::new(self.datatype.clone()),
+ Box::new(Field::new("item", self.datatype.clone(), true)),
))
}
}
@@ -214,7 +214,7 @@ mod tests {
ScalarValue::Int32(Some(4)),
ScalarValue::Int32(Some(5)),
]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
);
check_distinct_array_agg(col, out, DataType::Int32)
@@ -231,18 +231,18 @@ mod tests {
ScalarValue::from(2i32),
ScalarValue::from(3i32),
]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
ScalarValue::List(
Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
]),
- Box::new(DataType::List(Box::new(Field::new(
+ Box::new(Field::new(
"item",
- DataType::Int32,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
- )))),
+ )),
);
// [[6], [7, 8]]
@@ -250,40 +250,40 @@ mod tests {
Some(vec![
ScalarValue::List(
Some(vec![ScalarValue::from(6i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
ScalarValue::List(
Some(vec![ScalarValue::from(7i32), ScalarValue::from(8i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
),
]),
- Box::new(DataType::List(Box::new(Field::new(
+ Box::new(Field::new(
"item",
- DataType::Int32,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
- )))),
+ )),
);
// [[9]]
let l3 = ScalarValue::List(
Some(vec![ScalarValue::List(
Some(vec![ScalarValue::from(9i32)]),
- Box::new(DataType::Int32),
+ Box::new(Field::new("item", DataType::Int32, true)),
)]),
- Box::new(DataType::List(Box::new(Field::new(
+ Box::new(Field::new(
"item",
- DataType::Int32,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
- )))),
+ )),
);
let list = ScalarValue::List(
Some(vec![l1.clone(), l2.clone(), l3.clone()]),
- Box::new(DataType::List(Box::new(Field::new(
+ Box::new(Field::new(
"item",
- DataType::Int32,
+ DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
- )))),
+ )),
);
// Duplicate l1 in the input array and check that it is deduped in the output.
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs
index 5b3ef852e..83c50b4cd 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs
@@ -183,8 +183,10 @@ impl Accumulator for DistinctCountAccumulator {
.iter()
.map(|state_data_type| {
let values = Box::new(Vec::new());
- let data_type = Box::new(state_data_type.clone());
- ScalarValue::List(Some(*values), data_type)
+ ScalarValue::List(
+ Some(*values),
+ Box::new(Field::new("item", state_data_type.clone(), true)),
+ )
})
.collect::<Vec<_>>();
@@ -232,7 +234,7 @@ mod tests {
macro_rules! state_to_vec {
($LIST:expr, $DATA_TYPE:ident, $PRIM_TY:ty) => {{
match $LIST {
- ScalarValue::List(_, data_type) => match data_type.as_ref() {
+ ScalarValue::List(_, field) => match field.data_type() {
&DataType::$DATA_TYPE => (),
_ => panic!("Unexpected DataType for list"),
},
diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
index 29baffa44..ddca35775 100644
--- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
@@ -133,11 +133,13 @@ impl Accumulator for DistinctSumAccumulator {
// 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set
let state_out = {
let mut distinct_values = Vec::new();
- let data_type = Box::new(self.data_type.clone());
self.hash_values
.iter()
.for_each(|distinct_value| distinct_values.push(distinct_value.clone()));
- vec![ScalarValue::List(Some(distinct_values), data_type)]
+ vec![ScalarValue::List(
+ Some(distinct_values),
+ Box::new(Field::new("item", self.data_type.clone(), true)),
+ )]
};
Ok(state_out)
}
diff --git a/datafusion/physical-expr/src/aggregate/tdigest.rs b/datafusion/physical-expr/src/aggregate/tdigest.rs
index b875dc1ba..114eb185c 100644
--- a/datafusion/physical-expr/src/aggregate/tdigest.rs
+++ b/datafusion/physical-expr/src/aggregate/tdigest.rs
@@ -27,7 +27,7 @@
//! [TDigest sketch algorithm]: https://arxiv.org/abs/1902.04023
//! [Facebook's Folly TDigest]: https://github.com/facebook/folly/blob/main/folly/stats/TDigest.h
-use arrow::datatypes::DataType;
+use arrow::datatypes::{DataType, Field};
use datafusion_common::DataFusionError;
use datafusion_common::Result;
use datafusion_common::ScalarValue;
@@ -624,7 +624,10 @@ impl TDigest {
ScalarValue::Float64(Some(self.count.into_inner())),
ScalarValue::Float64(Some(self.max.into_inner())),
ScalarValue::Float64(Some(self.min.into_inner())),
- ScalarValue::List(Some(centroids), Box::new(DataType::Float64)),
+ ScalarValue::List(
+ Some(centroids),
+ Box::new(Field::new("item", DataType::Float64, true)),
+ ),
]
}
@@ -645,7 +648,7 @@ impl TDigest {
};
let centroids: Vec<_> = match &state[5] {
- ScalarValue::List(Some(c), d) if **d == DataType::Float64 => c
+ ScalarValue::List(Some(c), f) if *f.data_type() == DataType::Float64 => c
.chunks(2)
.map(|v| Centroid::new(cast_scalar_f64!(v[0]), cast_scalar_f64!(v[1])))
.collect(),
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 8896316fa..d7165c5d3 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -654,7 +654,7 @@ message Union{
}
message ScalarListValue{
- ScalarType datatype = 1;
+ Field field = 1;
repeated ScalarValue values = 2;
}
diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs
index 279cb8e40..1cd6edfe8 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use crate::protobuf;
use crate::protobuf::plan_type::PlanTypeEnum::{
FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
OptimizedLogicalPlan, OptimizedPhysicalPlan,
};
+use crate::protobuf::{self};
use crate::protobuf::{
CubeNode, GroupingSetNode, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
RollupNode,
@@ -584,147 +584,6 @@ impl TryFrom<&protobuf::scalar_type::Datatype> for DataType {
}
}
-impl TryFrom<&protobuf::scalar_value::Value> for ScalarValue {
- type Error = Error;
-
- fn try_from(scalar: &protobuf::scalar_value::Value) -> Result<Self, Self::Error> {
- use protobuf::{scalar_value::Value, PrimitiveScalarType};
-
- let scalar = match scalar {
- Value::BoolValue(v) => ScalarValue::Boolean(Some(*v)),
- Value::Utf8Value(v) => ScalarValue::Utf8(Some(v.to_owned())),
- Value::LargeUtf8Value(v) => ScalarValue::LargeUtf8(Some(v.to_owned())),
- Value::Int8Value(v) => ScalarValue::Int8(Some(*v as i8)),
- Value::Int16Value(v) => ScalarValue::Int16(Some(*v as i16)),
- Value::Int32Value(v) => ScalarValue::Int32(Some(*v)),
- Value::Int64Value(v) => ScalarValue::Int64(Some(*v)),
- Value::Uint8Value(v) => ScalarValue::UInt8(Some(*v as u8)),
- Value::Uint16Value(v) => ScalarValue::UInt16(Some(*v as u16)),
- Value::Uint32Value(v) => ScalarValue::UInt32(Some(*v)),
- Value::Uint64Value(v) => ScalarValue::UInt64(Some(*v)),
- Value::Float32Value(v) => ScalarValue::Float32(Some(*v)),
- Value::Float64Value(v) => ScalarValue::Float64(Some(*v)),
- Value::Date32Value(v) => ScalarValue::Date32(Some(*v)),
- Value::ListValue(v) => v.try_into()?,
- Value::NullListValue(v) => ScalarValue::List(None, Box::new(v.try_into()?)),
- Value::NullValue(null_enum) => {
- let primitive = PrimitiveScalarType::try_from(null_enum)?;
- (&primitive).try_into()?
- }
- Value::Decimal128Value(val) => {
- let array = vec_to_array(val.value.clone());
- ScalarValue::Decimal128(
- Some(i128::from_be_bytes(array)),
- val.p as usize,
- val.s as usize,
- )
- }
- Value::Date64Value(v) => ScalarValue::Date64(Some(*v)),
- Value::IntervalYearmonthValue(v) => ScalarValue::IntervalYearMonth(Some(*v)),
- Value::IntervalDaytimeValue(v) => ScalarValue::IntervalDayTime(Some(*v)),
- Value::TimestampValue(v) => {
- let ts_value =
- v.value.as_ref().ok_or_else(|| Error::required("value"))?;
- let timezone = if v.timezone.is_empty() {
- None
- } else {
- Some(v.timezone.clone())
- };
- match ts_value {
- protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
- ScalarValue::TimestampMicrosecond(Some(*t), timezone)
- }
- protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
- ScalarValue::TimestampNanosecond(Some(*t), timezone)
- }
- protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
- ScalarValue::TimestampSecond(Some(*t), timezone)
- }
- protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
- ScalarValue::TimestampMillisecond(Some(*t), timezone)
- }
- }
- }
- };
- Ok(scalar)
- }
-}
-
-impl TryFrom<&protobuf::ScalarListValue> for ScalarValue {
- type Error = Error;
-
- fn try_from(
- scalar_list_value: &protobuf::ScalarListValue,
- ) -> Result<Self, Self::Error> {
- use protobuf::{scalar_type::Datatype, PrimitiveScalarType};
-
- let protobuf::ScalarListValue { datatype, values } = scalar_list_value;
- let pb_scalar_type = datatype
- .as_ref()
- .ok_or_else(|| Error::required("datatype"))?;
-
- let scalar_type = pb_scalar_type
- .datatype
- .as_ref()
- .ok_or_else(|| Error::required("datatype"))?;
- let scalar_values = match scalar_type {
- Datatype::Scalar(scalar_type_i32) => {
- let leaf_scalar_type =
- protobuf::PrimitiveScalarType::try_from(scalar_type_i32)?;
- let typechecked_values: Vec<datafusion::scalar::ScalarValue> = values
- .iter()
- .map(|protobuf::ScalarValue { value: opt_value }| {
- let value =
- opt_value.as_ref().ok_or_else(|| Error::required("value"))?;
- typechecked_scalar_value_conversion(value, leaf_scalar_type)
- })
- .collect::<Result<Vec<_>, _>>()?;
- ScalarValue::List(
- Some(typechecked_values),
- Box::new(leaf_scalar_type.into()),
- )
- }
- Datatype::List(list_type) => {
- let protobuf::ScalarListType {
- deepest_type,
- field_names,
- } = &list_type;
- let leaf_type = PrimitiveScalarType::try_from(deepest_type)?;
- let depth = field_names.len();
-
- let typechecked_values: Vec<ScalarValue> = if depth == 0 {
- return Err(Error::at_least_one("field_names"));
- } else if depth == 1 {
- values
- .iter()
- .map(|protobuf::ScalarValue { value: opt_value }| {
- let value = opt_value
- .as_ref()
- .ok_or_else(|| Error::required("value"))?;
- typechecked_scalar_value_conversion(value, leaf_type)
- })
- .collect::<Result<Vec<_>, _>>()?
- } else {
- values
- .iter()
- .map(|protobuf::ScalarValue { value: opt_value }| {
- opt_value.as_ref().required("value")
- })
- .collect::<Result<Vec<_>, _>>()?
- };
- ScalarValue::List(
- match typechecked_values.len() {
- 0 => None,
- _ => Some(typechecked_values),
- },
- Box::new((list_type).try_into()?),
- )
- }
- };
- Ok(scalar_values)
- }
-}
-
impl TryFrom<&protobuf::ScalarType> for DataType {
type Error = Error;
@@ -855,22 +714,22 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
Value::ListValue(scalar_list) => {
let protobuf::ScalarListValue {
values,
- datatype: opt_scalar_type,
+ field: opt_field,
} = &scalar_list;
- let scalar_type = opt_scalar_type.as_ref().required("datatype")?;
- let scalar_type = Box::new(scalar_type);
+ let field = opt_field.as_ref().required("field")?;
+ let field = Box::new(field);
let typechecked_values: Vec<ScalarValue> = values
.iter()
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()?;
- Self::List(Some(typechecked_values), scalar_type)
+ Self::List(Some(typechecked_values), field)
}
Value::NullListValue(v) => {
- let datatype = v.datatype.as_ref().required("datatype")?;
- Self::List(None, Box::new(datatype))
+ let field = Field::new("item", v.try_into()?, true);
+ Self::List(None, Box::new(field))
}
Value::NullValue(v) => {
let null_type_enum = protobuf::PrimitiveScalarType::try_from(v)?;
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 4683abd7a..1d7847df2 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -294,24 +294,36 @@ mod roundtrip_tests {
ScalarValue::Int16(None),
ScalarValue::Float32(Some(32.0)),
]),
- Box::new(DataType::List(new_box_field("item", DataType::Int16, true))),
+ new_box_field(
+ "item",
+ DataType::List(new_box_field("item", DataType::Int16, true)),
+ true,
+ ),
),
ScalarValue::List(
Some(vec![
ScalarValue::Float32(None),
ScalarValue::Float32(Some(32.0)),
]),
- Box::new(DataType::List(new_box_field("item", DataType::Int16, true))),
+ new_box_field(
+ "item",
+ DataType::List(new_box_field("item", DataType::Int16, true)),
+ true,
+ ),
),
ScalarValue::List(
Some(vec![
ScalarValue::List(
None,
- Box::new(DataType::List(new_box_field(
- "level2",
- DataType::Float32,
+ new_box_field(
+ "item",
+ DataType::List(new_box_field(
+ "level2",
+ DataType::Float32,
+ true,
+ )),
true,
- ))),
+ ),
),
ScalarValue::List(
Some(vec![
@@ -321,26 +333,38 @@ mod roundtrip_tests {
ScalarValue::Float32(Some(2.0)),
ScalarValue::Float32(Some(1.0)),
]),
- Box::new(DataType::List(new_box_field(
- "level2",
- DataType::Float32,
+ new_box_field(
+ "item",
+ DataType::List(new_box_field(
+ "level2",
+ DataType::Float32,
+ true,
+ )),
true,
- ))),
+ ),
),
ScalarValue::List(
None,
- Box::new(DataType::List(new_box_field(
- "lists are typed inconsistently",
- DataType::Int16,
+ new_box_field(
+ "item",
+ DataType::List(new_box_field(
+ "lists are typed inconsistently",
+ DataType::Int16,
+ true,
+ )),
true,
- ))),
+ ),
),
]),
- Box::new(DataType::List(new_box_field(
- "level1",
- DataType::List(new_box_field("level2", DataType::Float32, true)),
+ new_box_field(
+ "item",
+ DataType::List(new_box_field(
+ "level1",
+ DataType::List(new_box_field("level2", DataType::Float32, true)),
+ true,
+ )),
true,
- ))),
+ ),
),
];
@@ -373,7 +397,7 @@ mod roundtrip_tests {
ScalarValue::UInt64(None),
ScalarValue::Utf8(None),
ScalarValue::LargeUtf8(None),
- ScalarValue::List(None, Box::new(DataType::Boolean)),
+ ScalarValue::List(None, new_box_field("item", DataType::Boolean, true)),
ScalarValue::Date32(None),
ScalarValue::Boolean(Some(true)),
ScalarValue::Boolean(Some(false)),
@@ -437,21 +461,25 @@ mod roundtrip_tests {
ScalarValue::Float32(Some(2.0)),
ScalarValue::Float32(Some(1.0)),
]),
- Box::new(DataType::List(new_box_field(
- "level1",
- DataType::Float32,
+ new_box_field(
+ "item",
+ DataType::List(new_box_field("level1", DataType::Float32, true)),
true,
- ))),
+ ),
),
ScalarValue::List(
Some(vec![
ScalarValue::List(
None,
- Box::new(DataType::List(new_box_field(
- "level2",
- DataType::Float32,
+ new_box_field(
+ "item",
+ DataType::List(new_box_field(
+ "level2",
+ DataType::Float32,
+ true,
+ )),
true,
- ))),
+ ),
),
ScalarValue::List(
Some(vec![
@@ -461,18 +489,26 @@ mod roundtrip_tests {
ScalarValue::Float32(Some(2.0)),
ScalarValue::Float32(Some(1.0)),
]),
- Box::new(DataType::List(new_box_field(
- "level2",
- DataType::Float32,
+ new_box_field(
+ "item",
+ DataType::List(new_box_field(
+ "level2",
+ DataType::Float32,
+ true,
+ )),
true,
- ))),
+ ),
),
]),
- Box::new(DataType::List(new_box_field(
- "level1",
- DataType::List(new_box_field("level2", DataType::Float32, true)),
+ new_box_field(
+ "item",
+ DataType::List(new_box_field(
+ "level1",
+ DataType::List(new_box_field("level2", DataType::Float32, true)),
+ true,
+ )),
true,
- ))),
+ ),
),
];
@@ -624,9 +660,10 @@ mod roundtrip_tests {
];
for test_case in should_pass.into_iter() {
- let proto: super::protobuf::ScalarType = (&test_case).try_into().unwrap();
- let roundtrip: DataType = (&proto).try_into().unwrap();
- assert_eq!(format!("{:?}", test_case), format!("{:?}", roundtrip));
+ let field = Field::new("item", test_case, true);
+ let proto: super::protobuf::Field = (&field).try_into().unwrap();
+ let roundtrip: Field = (&proto).try_into().unwrap();
+ assert_eq!(format!("{:?}", field), format!("{:?}", roundtrip));
}
let mut success: Vec<DataType> = Vec::new();
@@ -634,7 +671,7 @@ mod roundtrip_tests {
let proto: std::result::Result<
super::protobuf::ScalarType,
super::to_proto::Error,
- > = (&test_case).try_into();
+ > = (&Field::new("item", test_case.clone(), true)).try_into();
if proto.is_ok() {
success.push(test_case)
}
@@ -803,7 +840,10 @@ mod roundtrip_tests {
ScalarValue::Date32(None),
ScalarValue::TimestampMicrosecond(None, None),
ScalarValue::TimestampNanosecond(None, None),
- // ScalarValue::List(None, DataType::Boolean)
+ ScalarValue::List(
+ None,
+ Box::new(Field::new("item", DataType::Boolean, false)),
+ ),
];
for test_case in test_types.into_iter() {
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index afe24ea89..d5ba5c38a 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -835,25 +835,28 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
Value::LargeUtf8Value(s.to_owned())
})
}
- scalar::ScalarValue::List(value, datatype) => {
- println!("Current datatype of list: {:?}", datatype);
+ scalar::ScalarValue::List(value, boxed_field) => {
+ println!("Current field of list: {:?}", boxed_field);
match value {
Some(values) => {
if values.is_empty() {
protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::ListValue(
protobuf::ScalarListValue {
- datatype: Some(datatype.as_ref().try_into()?),
+ field: Some(boxed_field.as_ref().into()),
values: Vec::new(),
},
)),
}
} else {
- let scalar_type = match datatype.as_ref() {
+ let scalar_type = match boxed_field.data_type() {
DataType::List(field) => field.as_ref().data_type(),
- _ => todo!("Proper error handling"),
+ unsupported => {
+ todo!("Proper error handling {}", unsupported)
+ }
};
println!("Current scalar type for list: {:?}", scalar_type);
+
let type_checked_values: Vec<protobuf::ScalarValue> = values
.iter()
.map(|scalar| match (scalar, scalar_type) {
@@ -862,7 +865,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
DataType::List(field),
) => {
if let DataType::List(list_field) =
- list_type.as_ref()
+ list_type.data_type()
{
let scalar_datatype = field.data_type();
let list_datatype = list_field.data_type();
@@ -879,7 +882,8 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
scalar.try_into()
} else {
Err(Error::inconsistent_list_designated(
- scalar, datatype,
+ scalar,
+ boxed_field.data_type(),
))
}
}
@@ -930,14 +934,15 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
DataType::LargeUtf8,
) => scalar.try_into(),
_ => Err(Error::inconsistent_list_designated(
- scalar, datatype,
+ scalar,
+ boxed_field.data_type(),
)),
})
.collect::<Result<Vec<_>, _>>()?;
protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::ListValue(
protobuf::ScalarListValue {
- datatype: Some(datatype.as_ref().try_into()?),
+ field: Some(boxed_field.as_ref().into()),
values: type_checked_values,
},
)),
@@ -946,7 +951,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
}
None => protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::NullListValue(
- datatype.as_ref().try_into()?,
+ boxed_field.as_ref().try_into()?,
)),
},
}
@@ -1121,11 +1126,11 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
}
}
-impl TryFrom<&DataType> for protobuf::ScalarType {
+impl TryFrom<&Field> for protobuf::ScalarType {
type Error = Error;
- fn try_from(value: &DataType) -> Result<Self, Self::Error> {
- let datatype = protobuf::scalar_type::Datatype::try_from(value)?;
+ fn try_from(value: &Field) -> Result<Self, Self::Error> {
+ let datatype = protobuf::scalar_type::Datatype::try_from(value.data_type())?;
Ok(Self {
datatype: Some(datatype),
})
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index edabbf8f2..39b20b2e0 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -2501,7 +2501,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if data_types.is_empty() {
Ok(Expr::Literal(ScalarValue::List(
None,
- Box::new(DataType::Utf8),
+ Box::new(Field::new("item", DataType::Utf8, true)),
)))
} else if data_types.len() > 1 {
Err(DataFusionError::NotImplemented(format!(
@@ -2513,7 +2513,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Ok(Expr::Literal(ScalarValue::List(
Some(values),
- Box::new(data_type),
+ Box::new(Field::new("item", data_type, true)),
)))
}
}