You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/07/18 17:28:38 UTC

[arrow-datafusion] branch master updated: Preserve field name in `ScalarValue::List` (#2893)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 90e5fd048 Preserve field name in `ScalarValue::List` (#2893)
90e5fd048 is described below

commit 90e5fd0480ca30d19d1b638e050e3a4da60df9cc
Author: comphead <co...@users.noreply.github.com>
AuthorDate: Mon Jul 18 10:28:33 2022 -0700

    Preserve field name in `ScalarValue::List` (#2893)
    
    * scalar list data type
    
    * list element name
    
    * remove unused code
    
    * remove unused logging
    
    * Reenabled commented test
    
    * Formatting
    
    * merge
    
    * Update datafusion/common/src/scalar.rs
    
    * Update datafusion/proto/src/to_proto.rs
    
    * Update datafusion/proto/src/to_proto.rs
    
    * fix fmt
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/common/src/scalar.rs                    | 101 ++++++++------
 .../physical-expr/src/aggregate/array_agg.rs       |  38 ++---
 .../src/aggregate/array_agg_distinct.rs            |  40 +++---
 .../physical-expr/src/aggregate/count_distinct.rs  |   8 +-
 .../physical-expr/src/aggregate/sum_distinct.rs    |   6 +-
 datafusion/physical-expr/src/aggregate/tdigest.rs  |   9 +-
 datafusion/proto/proto/datafusion.proto            |   2 +-
 datafusion/proto/src/from_proto.rs                 | 155 +--------------------
 datafusion/proto/src/lib.rs                        | 120 ++++++++++------
 datafusion/proto/src/to_proto.rs                   |  31 +++--
 datafusion/sql/src/planner.rs                      |   4 +-
 11 files changed, 219 insertions(+), 295 deletions(-)

diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index ba02aeb67..633a887e2 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -77,7 +77,7 @@ pub enum ScalarValue {
     /// large binary
     LargeBinary(Option<Vec<u8>>),
     /// list of nested ScalarValue
-    List(Option<Vec<ScalarValue>>, Box<DataType>),
+    List(Option<Vec<ScalarValue>>, Box<Field>),
     /// Date stored as a signed 32bit int days since UNIX epoch 1970-01-01
     Date32(Option<i32>),
     /// Date stored as a signed 64bit int milliseconds since UNIX epoch 1970-01-01
@@ -651,9 +651,9 @@ impl ScalarValue {
             ScalarValue::LargeUtf8(_) => DataType::LargeUtf8,
             ScalarValue::Binary(_) => DataType::Binary,
             ScalarValue::LargeBinary(_) => DataType::LargeBinary,
-            ScalarValue::List(_, data_type) => DataType::List(Box::new(Field::new(
+            ScalarValue::List(_, field) => DataType::List(Box::new(Field::new(
                 "item",
-                data_type.as_ref().clone(),
+                field.data_type().clone(),
                 true,
             ))),
             ScalarValue::Date32(_) => DataType::Date32,
@@ -1300,7 +1300,7 @@ impl ScalarValue {
                         .collect::<LargeBinaryArray>(),
                 ),
             },
-            ScalarValue::List(values, data_type) => Arc::new(match data_type.as_ref() {
+            ScalarValue::List(values, field) => Arc::new(match field.data_type() {
                 DataType::Boolean => build_list!(BooleanBuilder, Boolean, values, size),
                 DataType::Int8 => build_list!(Int8Builder, Int8, values, size),
                 DataType::Int16 => build_list!(Int16Builder, Int16, values, size),
@@ -1323,7 +1323,7 @@ impl ScalarValue {
                     repeat(self.clone()).take(size),
                     &DataType::List(Box::new(Field::new(
                         "item",
-                        data_type.as_ref().clone(),
+                        field.data_type().clone(),
                         true,
                     ))),
                 )
@@ -1463,8 +1463,10 @@ impl ScalarValue {
                         Some(scalar_vec)
                     }
                 };
-                let data_type = nested_type.data_type().clone();
-                ScalarValue::List(value, Box::new(data_type))
+                ScalarValue::List(
+                    value,
+                    Box::new(Field::new("item", nested_type.data_type().clone(), true)),
+                )
             }
             DataType::Date32 => {
                 typed_cast!(array, index, Date32Array, Date32)
@@ -1564,8 +1566,10 @@ impl ScalarValue {
                         Some(scalar_vec)
                     }
                 };
-                let data_type = nested_type.data_type().clone();
-                ScalarValue::List(value, Box::new(data_type))
+                ScalarValue::List(
+                    value,
+                    Box::new(Field::new("item", nested_type.data_type().clone(), true)),
+                )
             }
             other => {
                 return Err(DataFusionError::NotImplemented(format!(
@@ -1898,9 +1902,10 @@ impl TryFrom<&DataType> for ScalarValue {
                 index_type.clone(),
                 Box::new(value_type.as_ref().try_into()?),
             ),
-            DataType::List(ref nested_type) => {
-                ScalarValue::List(None, Box::new(nested_type.data_type().clone()))
-            }
+            DataType::List(ref nested_type) => ScalarValue::List(
+                None,
+                Box::new(Field::new("item", nested_type.data_type().clone(), true)),
+            ),
             DataType::Struct(fields) => {
                 ScalarValue::Struct(None, Box::new(fields.clone()))
             }
@@ -2248,8 +2253,11 @@ mod tests {
 
     #[test]
     fn scalar_list_null_to_array() {
-        let list_array_ref =
-            ScalarValue::List(None, Box::new(DataType::UInt64)).to_array();
+        let list_array_ref = ScalarValue::List(
+            None,
+            Box::new(Field::new("item", DataType::UInt64, false)),
+        )
+        .to_array();
         let list_array = list_array_ref.as_any().downcast_ref::<ListArray>().unwrap();
 
         assert!(list_array.is_null(0));
@@ -2265,7 +2273,7 @@ mod tests {
                 ScalarValue::UInt64(None),
                 ScalarValue::UInt64(Some(101)),
             ]),
-            Box::new(DataType::UInt64),
+            Box::new(Field::new("item", DataType::UInt64, false)),
         )
         .to_array();
 
@@ -2747,11 +2755,11 @@ mod tests {
         assert_eq!(
             List(
                 Some(vec![Int32(Some(1)), Int32(Some(5))]),
-                Box::new(DataType::Int32),
+                Box::new(Field::new("item", DataType::Int32, false)),
             )
             .partial_cmp(&List(
                 Some(vec![Int32(Some(1)), Int32(Some(5))]),
-                Box::new(DataType::Int32),
+                Box::new(Field::new("item", DataType::Int32, false)),
             )),
             Some(Ordering::Equal)
         );
@@ -2759,11 +2767,11 @@ mod tests {
         assert_eq!(
             List(
                 Some(vec![Int32(Some(10)), Int32(Some(5))]),
-                Box::new(DataType::Int32),
+                Box::new(Field::new("item", DataType::Int32, false)),
             )
             .partial_cmp(&List(
                 Some(vec![Int32(Some(1)), Int32(Some(5))]),
-                Box::new(DataType::Int32),
+                Box::new(Field::new("item", DataType::Int32, false)),
             )),
             Some(Ordering::Greater)
         );
@@ -2771,11 +2779,11 @@ mod tests {
         assert_eq!(
             List(
                 Some(vec![Int32(Some(1)), Int32(Some(5))]),
-                Box::new(DataType::Int32),
+                Box::new(Field::new("item", DataType::Int32, false)),
             )
             .partial_cmp(&List(
                 Some(vec![Int32(Some(10)), Int32(Some(5))]),
-                Box::new(DataType::Int32),
+                Box::new(Field::new("item", DataType::Int32, false)),
             )),
             Some(Ordering::Less)
         );
@@ -2784,11 +2792,11 @@ mod tests {
         assert_eq!(
             List(
                 Some(vec![Int64(Some(1)), Int64(Some(5))]),
-                Box::new(DataType::Int64),
+                Box::new(Field::new("item", DataType::Int64, false)),
             )
             .partial_cmp(&List(
                 Some(vec![Int32(Some(1)), Int32(Some(5))]),
-                Box::new(DataType::Int32),
+                Box::new(Field::new("item", DataType::Int32, false)),
             )),
             None
         );
@@ -3011,17 +3019,17 @@ mod tests {
                 ScalarValue::from(2i32),
                 ScalarValue::from(3i32),
             ]),
-            Box::new(DataType::Int32),
+            Box::new(Field::new("item", DataType::Int32, false)),
         );
 
         let l1 = ScalarValue::List(
             Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
-            Box::new(DataType::Int32),
+            Box::new(Field::new("item", DataType::Int32, false)),
         );
 
         let l2 = ScalarValue::List(
             Some(vec![ScalarValue::from(6i32)]),
-            Box::new(DataType::Int32),
+            Box::new(Field::new("item", DataType::Int32, false)),
         );
 
         // Define struct scalars
@@ -3066,13 +3074,18 @@ mod tests {
         // Define list-of-structs scalars
         let nl0 = ScalarValue::List(
             Some(vec![s0.clone(), s1.clone()]),
-            Box::new(s0.get_datatype()),
+            Box::new(Field::new("item", s0.get_datatype(), true)),
         );
 
-        let nl1 = ScalarValue::List(Some(vec![s2]), Box::new(s0.get_datatype()));
-
-        let nl2 = ScalarValue::List(Some(vec![s1]), Box::new(s0.get_datatype()));
+        let nl1 = ScalarValue::List(
+            Some(vec![s2]),
+            Box::new(Field::new("item", s0.get_datatype(), true)),
+        );
 
+        let nl2 = ScalarValue::List(
+            Some(vec![s1]),
+            Box::new(Field::new("item", s0.get_datatype(), true)),
+        );
         // iter_to_array for list-of-struct
         let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
         let array = array.as_any().downcast_ref::<ListArray>().unwrap();
@@ -3222,48 +3235,48 @@ mod tests {
                         ScalarValue::from(2i32),
                         ScalarValue::from(3i32),
                     ]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
                 ScalarValue::List(
                     Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
             ]),
-            Box::new(DataType::List(Box::new(Field::new(
+            Box::new(Field::new(
                 "item",
-                DataType::Int32,
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
                 true,
-            )))),
+            )),
         );
 
         let l2 = ScalarValue::List(
             Some(vec![
                 ScalarValue::List(
                     Some(vec![ScalarValue::from(6i32)]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
                 ScalarValue::List(
                     Some(vec![ScalarValue::from(7i32), ScalarValue::from(8i32)]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
             ]),
-            Box::new(DataType::List(Box::new(Field::new(
+            Box::new(Field::new(
                 "item",
-                DataType::Int32,
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
                 true,
-            )))),
+            )),
         );
 
         let l3 = ScalarValue::List(
             Some(vec![ScalarValue::List(
                 Some(vec![ScalarValue::from(9i32)]),
-                Box::new(DataType::Int32),
+                Box::new(Field::new("item", DataType::Int32, true)),
             )]),
-            Box::new(DataType::List(Box::new(Field::new(
+            Box::new(Field::new(
                 "item",
-                DataType::Int32,
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
                 true,
-            )))),
+            )),
         );
 
         let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap();
diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs
index 2a40c8bad..eaed89390 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg.rs
@@ -150,7 +150,7 @@ impl Accumulator for ArrayAggAccumulator {
     fn evaluate(&self) -> Result<ScalarValue> {
         Ok(ScalarValue::List(
             Some(self.values.clone()),
-            Box::new(self.datatype.clone()),
+            Box::new(Field::new("item", self.datatype.clone(), true)),
         ))
     }
 }
@@ -179,7 +179,7 @@ mod tests {
                 ScalarValue::Int32(Some(4)),
                 ScalarValue::Int32(Some(5)),
             ]),
-            Box::new(DataType::Int32),
+            Box::new(Field::new("item", DataType::Int32, true)),
         );
 
         generic_test_op!(a, DataType::Int32, ArrayAgg, list, DataType::Int32)
@@ -195,57 +195,57 @@ mod tests {
                         ScalarValue::from(2i32),
                         ScalarValue::from(3i32),
                     ]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
                 ScalarValue::List(
                     Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
             ]),
-            Box::new(DataType::List(Box::new(Field::new(
+            Box::new(Field::new(
                 "item",
-                DataType::Int32,
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
                 true,
-            )))),
+            )),
         );
 
         let l2 = ScalarValue::List(
             Some(vec![
                 ScalarValue::List(
                     Some(vec![ScalarValue::from(6i32)]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
                 ScalarValue::List(
                     Some(vec![ScalarValue::from(7i32), ScalarValue::from(8i32)]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
             ]),
-            Box::new(DataType::List(Box::new(Field::new(
+            Box::new(Field::new(
                 "item",
-                DataType::Int32,
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
                 true,
-            )))),
+            )),
         );
 
         let l3 = ScalarValue::List(
             Some(vec![ScalarValue::List(
                 Some(vec![ScalarValue::from(9i32)]),
-                Box::new(DataType::Int32),
+                Box::new(Field::new("item", DataType::Int32, true)),
             )]),
-            Box::new(DataType::List(Box::new(Field::new(
+            Box::new(Field::new(
                 "item",
-                DataType::Int32,
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
                 true,
-            )))),
+            )),
         );
 
         let list = ScalarValue::List(
             Some(vec![l1.clone(), l2.clone(), l3.clone()]),
-            Box::new(DataType::List(Box::new(Field::new(
+            Box::new(Field::new(
                 "item",
-                DataType::Int32,
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
                 true,
-            )))),
+            )),
         );
 
         let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap();
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
index 9448683c0..44e24e93c 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
@@ -122,7 +122,7 @@ impl Accumulator for DistinctArrayAggAccumulator {
     fn state(&self) -> Result<Vec<ScalarValue>> {
         Ok(vec![ScalarValue::List(
             Some(self.values.clone().into_iter().collect()),
-            Box::new(self.datatype.clone()),
+            Box::new(Field::new("item", self.datatype.clone(), true)),
         )])
     }
 
@@ -153,7 +153,7 @@ impl Accumulator for DistinctArrayAggAccumulator {
     fn evaluate(&self) -> Result<ScalarValue> {
         Ok(ScalarValue::List(
             Some(self.values.clone().into_iter().collect()),
-            Box::new(self.datatype.clone()),
+            Box::new(Field::new("item", self.datatype.clone(), true)),
         ))
     }
 }
@@ -214,7 +214,7 @@ mod tests {
                 ScalarValue::Int32(Some(4)),
                 ScalarValue::Int32(Some(5)),
             ]),
-            Box::new(DataType::Int32),
+            Box::new(Field::new("item", DataType::Int32, true)),
         );
 
         check_distinct_array_agg(col, out, DataType::Int32)
@@ -231,18 +231,18 @@ mod tests {
                         ScalarValue::from(2i32),
                         ScalarValue::from(3i32),
                     ]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
                 ScalarValue::List(
                     Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
             ]),
-            Box::new(DataType::List(Box::new(Field::new(
+            Box::new(Field::new(
                 "item",
-                DataType::Int32,
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
                 true,
-            )))),
+            )),
         );
 
         // [[6], [7, 8]]
@@ -250,40 +250,40 @@ mod tests {
             Some(vec![
                 ScalarValue::List(
                     Some(vec![ScalarValue::from(6i32)]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
                 ScalarValue::List(
                     Some(vec![ScalarValue::from(7i32), ScalarValue::from(8i32)]),
-                    Box::new(DataType::Int32),
+                    Box::new(Field::new("item", DataType::Int32, true)),
                 ),
             ]),
-            Box::new(DataType::List(Box::new(Field::new(
+            Box::new(Field::new(
                 "item",
-                DataType::Int32,
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
                 true,
-            )))),
+            )),
         );
 
         // [[9]]
         let l3 = ScalarValue::List(
             Some(vec![ScalarValue::List(
                 Some(vec![ScalarValue::from(9i32)]),
-                Box::new(DataType::Int32),
+                Box::new(Field::new("item", DataType::Int32, true)),
             )]),
-            Box::new(DataType::List(Box::new(Field::new(
+            Box::new(Field::new(
                 "item",
-                DataType::Int32,
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
                 true,
-            )))),
+            )),
         );
 
         let list = ScalarValue::List(
             Some(vec![l1.clone(), l2.clone(), l3.clone()]),
-            Box::new(DataType::List(Box::new(Field::new(
+            Box::new(Field::new(
                 "item",
-                DataType::Int32,
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
                 true,
-            )))),
+            )),
         );
 
         // Duplicate l1 in the input array and check that it is deduped in the output.
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs
index 5b3ef852e..83c50b4cd 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs
@@ -183,8 +183,10 @@ impl Accumulator for DistinctCountAccumulator {
             .iter()
             .map(|state_data_type| {
                 let values = Box::new(Vec::new());
-                let data_type = Box::new(state_data_type.clone());
-                ScalarValue::List(Some(*values), data_type)
+                ScalarValue::List(
+                    Some(*values),
+                    Box::new(Field::new("item", state_data_type.clone(), true)),
+                )
             })
             .collect::<Vec<_>>();
 
@@ -232,7 +234,7 @@ mod tests {
     macro_rules! state_to_vec {
         ($LIST:expr, $DATA_TYPE:ident, $PRIM_TY:ty) => {{
             match $LIST {
-                ScalarValue::List(_, data_type) => match data_type.as_ref() {
+                ScalarValue::List(_, field) => match field.data_type() {
                     &DataType::$DATA_TYPE => (),
                     _ => panic!("Unexpected DataType for list"),
                 },
diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
index 29baffa44..ddca35775 100644
--- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
@@ -133,11 +133,13 @@ impl Accumulator for DistinctSumAccumulator {
         // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set
         let state_out = {
             let mut distinct_values = Vec::new();
-            let data_type = Box::new(self.data_type.clone());
             self.hash_values
                 .iter()
                 .for_each(|distinct_value| distinct_values.push(distinct_value.clone()));
-            vec![ScalarValue::List(Some(distinct_values), data_type)]
+            vec![ScalarValue::List(
+                Some(distinct_values),
+                Box::new(Field::new("item", self.data_type.clone(), true)),
+            )]
         };
         Ok(state_out)
     }
diff --git a/datafusion/physical-expr/src/aggregate/tdigest.rs b/datafusion/physical-expr/src/aggregate/tdigest.rs
index b875dc1ba..114eb185c 100644
--- a/datafusion/physical-expr/src/aggregate/tdigest.rs
+++ b/datafusion/physical-expr/src/aggregate/tdigest.rs
@@ -27,7 +27,7 @@
 //! [TDigest sketch algorithm]: https://arxiv.org/abs/1902.04023
 //! [Facebook's Folly TDigest]: https://github.com/facebook/folly/blob/main/folly/stats/TDigest.h
 
-use arrow::datatypes::DataType;
+use arrow::datatypes::{DataType, Field};
 use datafusion_common::DataFusionError;
 use datafusion_common::Result;
 use datafusion_common::ScalarValue;
@@ -624,7 +624,10 @@ impl TDigest {
             ScalarValue::Float64(Some(self.count.into_inner())),
             ScalarValue::Float64(Some(self.max.into_inner())),
             ScalarValue::Float64(Some(self.min.into_inner())),
-            ScalarValue::List(Some(centroids), Box::new(DataType::Float64)),
+            ScalarValue::List(
+                Some(centroids),
+                Box::new(Field::new("item", DataType::Float64, true)),
+            ),
         ]
     }
 
@@ -645,7 +648,7 @@ impl TDigest {
         };
 
         let centroids: Vec<_> = match &state[5] {
-            ScalarValue::List(Some(c), d) if **d == DataType::Float64 => c
+            ScalarValue::List(Some(c), f) if *f.data_type() == DataType::Float64 => c
                 .chunks(2)
                 .map(|v| Centroid::new(cast_scalar_f64!(v[0]), cast_scalar_f64!(v[1])))
                 .collect(),
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 8896316fa..d7165c5d3 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -654,7 +654,7 @@ message Union{
 }
 
 message ScalarListValue{
-    ScalarType datatype = 1;
+    Field field = 1;
     repeated ScalarValue values = 2;
 }
 
diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs
index 279cb8e40..1cd6edfe8 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::protobuf;
 use crate::protobuf::plan_type::PlanTypeEnum::{
     FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
     OptimizedLogicalPlan, OptimizedPhysicalPlan,
 };
+use crate::protobuf::{self};
 use crate::protobuf::{
     CubeNode, GroupingSetNode, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
     RollupNode,
@@ -584,147 +584,6 @@ impl TryFrom<&protobuf::scalar_type::Datatype> for DataType {
     }
 }
 
-impl TryFrom<&protobuf::scalar_value::Value> for ScalarValue {
-    type Error = Error;
-
-    fn try_from(scalar: &protobuf::scalar_value::Value) -> Result<Self, Self::Error> {
-        use protobuf::{scalar_value::Value, PrimitiveScalarType};
-
-        let scalar = match scalar {
-            Value::BoolValue(v) => ScalarValue::Boolean(Some(*v)),
-            Value::Utf8Value(v) => ScalarValue::Utf8(Some(v.to_owned())),
-            Value::LargeUtf8Value(v) => ScalarValue::LargeUtf8(Some(v.to_owned())),
-            Value::Int8Value(v) => ScalarValue::Int8(Some(*v as i8)),
-            Value::Int16Value(v) => ScalarValue::Int16(Some(*v as i16)),
-            Value::Int32Value(v) => ScalarValue::Int32(Some(*v)),
-            Value::Int64Value(v) => ScalarValue::Int64(Some(*v)),
-            Value::Uint8Value(v) => ScalarValue::UInt8(Some(*v as u8)),
-            Value::Uint16Value(v) => ScalarValue::UInt16(Some(*v as u16)),
-            Value::Uint32Value(v) => ScalarValue::UInt32(Some(*v)),
-            Value::Uint64Value(v) => ScalarValue::UInt64(Some(*v)),
-            Value::Float32Value(v) => ScalarValue::Float32(Some(*v)),
-            Value::Float64Value(v) => ScalarValue::Float64(Some(*v)),
-            Value::Date32Value(v) => ScalarValue::Date32(Some(*v)),
-            Value::ListValue(v) => v.try_into()?,
-            Value::NullListValue(v) => ScalarValue::List(None, Box::new(v.try_into()?)),
-            Value::NullValue(null_enum) => {
-                let primitive = PrimitiveScalarType::try_from(null_enum)?;
-                (&primitive).try_into()?
-            }
-            Value::Decimal128Value(val) => {
-                let array = vec_to_array(val.value.clone());
-                ScalarValue::Decimal128(
-                    Some(i128::from_be_bytes(array)),
-                    val.p as usize,
-                    val.s as usize,
-                )
-            }
-            Value::Date64Value(v) => ScalarValue::Date64(Some(*v)),
-            Value::IntervalYearmonthValue(v) => ScalarValue::IntervalYearMonth(Some(*v)),
-            Value::IntervalDaytimeValue(v) => ScalarValue::IntervalDayTime(Some(*v)),
-            Value::TimestampValue(v) => {
-                let ts_value =
-                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
-                let timezone = if v.timezone.is_empty() {
-                    None
-                } else {
-                    Some(v.timezone.clone())
-                };
-                match ts_value {
-                    protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
-                        ScalarValue::TimestampMicrosecond(Some(*t), timezone)
-                    }
-                    protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
-                        ScalarValue::TimestampNanosecond(Some(*t), timezone)
-                    }
-                    protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
-                        ScalarValue::TimestampSecond(Some(*t), timezone)
-                    }
-                    protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
-                        ScalarValue::TimestampMillisecond(Some(*t), timezone)
-                    }
-                }
-            }
-        };
-        Ok(scalar)
-    }
-}
-
-impl TryFrom<&protobuf::ScalarListValue> for ScalarValue {
-    type Error = Error;
-
-    fn try_from(
-        scalar_list_value: &protobuf::ScalarListValue,
-    ) -> Result<Self, Self::Error> {
-        use protobuf::{scalar_type::Datatype, PrimitiveScalarType};
-
-        let protobuf::ScalarListValue { datatype, values } = scalar_list_value;
-        let pb_scalar_type = datatype
-            .as_ref()
-            .ok_or_else(|| Error::required("datatype"))?;
-
-        let scalar_type = pb_scalar_type
-            .datatype
-            .as_ref()
-            .ok_or_else(|| Error::required("datatype"))?;
-        let scalar_values = match scalar_type {
-            Datatype::Scalar(scalar_type_i32) => {
-                let leaf_scalar_type =
-                    protobuf::PrimitiveScalarType::try_from(scalar_type_i32)?;
-                let typechecked_values: Vec<datafusion::scalar::ScalarValue> = values
-                    .iter()
-                    .map(|protobuf::ScalarValue { value: opt_value }| {
-                        let value =
-                            opt_value.as_ref().ok_or_else(|| Error::required("value"))?;
-                        typechecked_scalar_value_conversion(value, leaf_scalar_type)
-                    })
-                    .collect::<Result<Vec<_>, _>>()?;
-                ScalarValue::List(
-                    Some(typechecked_values),
-                    Box::new(leaf_scalar_type.into()),
-                )
-            }
-            Datatype::List(list_type) => {
-                let protobuf::ScalarListType {
-                    deepest_type,
-                    field_names,
-                } = &list_type;
-                let leaf_type = PrimitiveScalarType::try_from(deepest_type)?;
-                let depth = field_names.len();
-
-                let typechecked_values: Vec<ScalarValue> = if depth == 0 {
-                    return Err(Error::at_least_one("field_names"));
-                } else if depth == 1 {
-                    values
-                        .iter()
-                        .map(|protobuf::ScalarValue { value: opt_value }| {
-                            let value = opt_value
-                                .as_ref()
-                                .ok_or_else(|| Error::required("value"))?;
-                            typechecked_scalar_value_conversion(value, leaf_type)
-                        })
-                        .collect::<Result<Vec<_>, _>>()?
-                } else {
-                    values
-                        .iter()
-                        .map(|protobuf::ScalarValue { value: opt_value }| {
-                            opt_value.as_ref().required("value")
-                        })
-                        .collect::<Result<Vec<_>, _>>()?
-                };
-                ScalarValue::List(
-                    match typechecked_values.len() {
-                        0 => None,
-                        _ => Some(typechecked_values),
-                    },
-                    Box::new((list_type).try_into()?),
-                )
-            }
-        };
-        Ok(scalar_values)
-    }
-}
-
 impl TryFrom<&protobuf::ScalarType> for DataType {
     type Error = Error;
 
@@ -855,22 +714,22 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
             Value::ListValue(scalar_list) => {
                 let protobuf::ScalarListValue {
                     values,
-                    datatype: opt_scalar_type,
+                    field: opt_field,
                 } = &scalar_list;
 
-                let scalar_type = opt_scalar_type.as_ref().required("datatype")?;
-                let scalar_type = Box::new(scalar_type);
+                let field = opt_field.as_ref().required("field")?;
+                let field = Box::new(field);
 
                 let typechecked_values: Vec<ScalarValue> = values
                     .iter()
                     .map(|val| val.try_into())
                     .collect::<Result<Vec<_>, _>>()?;
 
-                Self::List(Some(typechecked_values), scalar_type)
+                Self::List(Some(typechecked_values), field)
             }
             Value::NullListValue(v) => {
-                let datatype = v.datatype.as_ref().required("datatype")?;
-                Self::List(None, Box::new(datatype))
+                let field = Field::new("item", v.try_into()?, true);
+                Self::List(None, Box::new(field))
             }
             Value::NullValue(v) => {
                 let null_type_enum = protobuf::PrimitiveScalarType::try_from(v)?;
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 4683abd7a..1d7847df2 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -294,24 +294,36 @@ mod roundtrip_tests {
                     ScalarValue::Int16(None),
                     ScalarValue::Float32(Some(32.0)),
                 ]),
-                Box::new(DataType::List(new_box_field("item", DataType::Int16, true))),
+                new_box_field(
+                    "item",
+                    DataType::List(new_box_field("item", DataType::Int16, true)),
+                    true,
+                ),
             ),
             ScalarValue::List(
                 Some(vec![
                     ScalarValue::Float32(None),
                     ScalarValue::Float32(Some(32.0)),
                 ]),
-                Box::new(DataType::List(new_box_field("item", DataType::Int16, true))),
+                new_box_field(
+                    "item",
+                    DataType::List(new_box_field("item", DataType::Int16, true)),
+                    true,
+                ),
             ),
             ScalarValue::List(
                 Some(vec![
                     ScalarValue::List(
                         None,
-                        Box::new(DataType::List(new_box_field(
-                            "level2",
-                            DataType::Float32,
+                        new_box_field(
+                            "item",
+                            DataType::List(new_box_field(
+                                "level2",
+                                DataType::Float32,
+                                true,
+                            )),
                             true,
-                        ))),
+                        ),
                     ),
                     ScalarValue::List(
                         Some(vec![
@@ -321,26 +333,38 @@ mod roundtrip_tests {
                             ScalarValue::Float32(Some(2.0)),
                             ScalarValue::Float32(Some(1.0)),
                         ]),
-                        Box::new(DataType::List(new_box_field(
-                            "level2",
-                            DataType::Float32,
+                        new_box_field(
+                            "item",
+                            DataType::List(new_box_field(
+                                "level2",
+                                DataType::Float32,
+                                true,
+                            )),
                             true,
-                        ))),
+                        ),
                     ),
                     ScalarValue::List(
                         None,
-                        Box::new(DataType::List(new_box_field(
-                            "lists are typed inconsistently",
-                            DataType::Int16,
+                        new_box_field(
+                            "item",
+                            DataType::List(new_box_field(
+                                "lists are typed inconsistently",
+                                DataType::Int16,
+                                true,
+                            )),
                             true,
-                        ))),
+                        ),
                     ),
                 ]),
-                Box::new(DataType::List(new_box_field(
-                    "level1",
-                    DataType::List(new_box_field("level2", DataType::Float32, true)),
+                new_box_field(
+                    "item",
+                    DataType::List(new_box_field(
+                        "level1",
+                        DataType::List(new_box_field("level2", DataType::Float32, true)),
+                        true,
+                    )),
                     true,
-                ))),
+                ),
             ),
         ];
 
@@ -373,7 +397,7 @@ mod roundtrip_tests {
             ScalarValue::UInt64(None),
             ScalarValue::Utf8(None),
             ScalarValue::LargeUtf8(None),
-            ScalarValue::List(None, Box::new(DataType::Boolean)),
+            ScalarValue::List(None, new_box_field("item", DataType::Boolean, true)),
             ScalarValue::Date32(None),
             ScalarValue::Boolean(Some(true)),
             ScalarValue::Boolean(Some(false)),
@@ -437,21 +461,25 @@ mod roundtrip_tests {
                     ScalarValue::Float32(Some(2.0)),
                     ScalarValue::Float32(Some(1.0)),
                 ]),
-                Box::new(DataType::List(new_box_field(
-                    "level1",
-                    DataType::Float32,
+                new_box_field(
+                    "item",
+                    DataType::List(new_box_field("level1", DataType::Float32, true)),
                     true,
-                ))),
+                ),
             ),
             ScalarValue::List(
                 Some(vec![
                     ScalarValue::List(
                         None,
-                        Box::new(DataType::List(new_box_field(
-                            "level2",
-                            DataType::Float32,
+                        new_box_field(
+                            "item",
+                            DataType::List(new_box_field(
+                                "level2",
+                                DataType::Float32,
+                                true,
+                            )),
                             true,
-                        ))),
+                        ),
                     ),
                     ScalarValue::List(
                         Some(vec![
@@ -461,18 +489,26 @@ mod roundtrip_tests {
                             ScalarValue::Float32(Some(2.0)),
                             ScalarValue::Float32(Some(1.0)),
                         ]),
-                        Box::new(DataType::List(new_box_field(
-                            "level2",
-                            DataType::Float32,
+                        new_box_field(
+                            "item",
+                            DataType::List(new_box_field(
+                                "level2",
+                                DataType::Float32,
+                                true,
+                            )),
                             true,
-                        ))),
+                        ),
                     ),
                 ]),
-                Box::new(DataType::List(new_box_field(
-                    "level1",
-                    DataType::List(new_box_field("level2", DataType::Float32, true)),
+                new_box_field(
+                    "item",
+                    DataType::List(new_box_field(
+                        "level1",
+                        DataType::List(new_box_field("level2", DataType::Float32, true)),
+                        true,
+                    )),
                     true,
-                ))),
+                ),
             ),
         ];
 
@@ -624,9 +660,10 @@ mod roundtrip_tests {
         ];
 
         for test_case in should_pass.into_iter() {
-            let proto: super::protobuf::ScalarType = (&test_case).try_into().unwrap();
-            let roundtrip: DataType = (&proto).try_into().unwrap();
-            assert_eq!(format!("{:?}", test_case), format!("{:?}", roundtrip));
+            let field = Field::new("item", test_case, true);
+            let proto: super::protobuf::Field = (&field).try_into().unwrap();
+            let roundtrip: Field = (&proto).try_into().unwrap();
+            assert_eq!(format!("{:?}", field), format!("{:?}", roundtrip));
         }
 
         let mut success: Vec<DataType> = Vec::new();
@@ -634,7 +671,7 @@ mod roundtrip_tests {
             let proto: std::result::Result<
                 super::protobuf::ScalarType,
                 super::to_proto::Error,
-            > = (&test_case).try_into();
+            > = (&Field::new("item", test_case.clone(), true)).try_into();
             if proto.is_ok() {
                 success.push(test_case)
             }
@@ -803,7 +840,10 @@ mod roundtrip_tests {
             ScalarValue::Date32(None),
             ScalarValue::TimestampMicrosecond(None, None),
             ScalarValue::TimestampNanosecond(None, None),
-            // ScalarValue::List(None, DataType::Boolean)
+            ScalarValue::List(
+                None,
+                Box::new(Field::new("item", DataType::Boolean, false)),
+            ),
         ];
 
         for test_case in test_types.into_iter() {
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index afe24ea89..d5ba5c38a 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -835,25 +835,28 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
                     Value::LargeUtf8Value(s.to_owned())
                 })
             }
-            scalar::ScalarValue::List(value, datatype) => {
-                println!("Current datatype of list: {:?}", datatype);
+            scalar::ScalarValue::List(value, boxed_field) => {
+                println!("Current field of list: {:?}", boxed_field);
                 match value {
                     Some(values) => {
                         if values.is_empty() {
                             protobuf::ScalarValue {
                                 value: Some(protobuf::scalar_value::Value::ListValue(
                                     protobuf::ScalarListValue {
-                                        datatype: Some(datatype.as_ref().try_into()?),
+                                        field: Some(boxed_field.as_ref().into()),
                                         values: Vec::new(),
                                     },
                                 )),
                             }
                         } else {
-                            let scalar_type = match datatype.as_ref() {
+                            let scalar_type = match boxed_field.data_type() {
                                 DataType::List(field) => field.as_ref().data_type(),
-                                _ => todo!("Proper error handling"),
+                                unsupported => {
+                                    todo!("Proper error handling {}", unsupported)
+                                }
                             };
                             println!("Current scalar type for list: {:?}", scalar_type);
+
                             let type_checked_values: Vec<protobuf::ScalarValue> = values
                                 .iter()
                                 .map(|scalar| match (scalar, scalar_type) {
@@ -862,7 +865,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
                                         DataType::List(field),
                                     ) => {
                                         if let DataType::List(list_field) =
-                                            list_type.as_ref()
+                                            list_type.data_type()
                                         {
                                             let scalar_datatype = field.data_type();
                                             let list_datatype = list_field.data_type();
@@ -879,7 +882,8 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
                                             scalar.try_into()
                                         } else {
                                             Err(Error::inconsistent_list_designated(
-                                                scalar, datatype,
+                                                scalar,
+                                                boxed_field.data_type(),
                                             ))
                                         }
                                     }
@@ -930,14 +934,15 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
                                         DataType::LargeUtf8,
                                     ) => scalar.try_into(),
                                     _ => Err(Error::inconsistent_list_designated(
-                                        scalar, datatype,
+                                        scalar,
+                                        boxed_field.data_type(),
                                     )),
                                 })
                                 .collect::<Result<Vec<_>, _>>()?;
                             protobuf::ScalarValue {
                                 value: Some(protobuf::scalar_value::Value::ListValue(
                                     protobuf::ScalarListValue {
-                                        datatype: Some(datatype.as_ref().try_into()?),
+                                        field: Some(boxed_field.as_ref().into()),
                                         values: type_checked_values,
                                     },
                                 )),
@@ -946,7 +951,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
                     }
                     None => protobuf::ScalarValue {
                         value: Some(protobuf::scalar_value::Value::NullListValue(
-                            datatype.as_ref().try_into()?,
+                            boxed_field.as_ref().try_into()?,
                         )),
                     },
                 }
@@ -1121,11 +1126,11 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
     }
 }
 
-impl TryFrom<&DataType> for protobuf::ScalarType {
+impl TryFrom<&Field> for protobuf::ScalarType {
     type Error = Error;
 
-    fn try_from(value: &DataType) -> Result<Self, Self::Error> {
-        let datatype = protobuf::scalar_type::Datatype::try_from(value)?;
+    fn try_from(value: &Field) -> Result<Self, Self::Error> {
+        let datatype = protobuf::scalar_type::Datatype::try_from(value.data_type())?;
         Ok(Self {
             datatype: Some(datatype),
         })
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index edabbf8f2..39b20b2e0 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -2501,7 +2501,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         if data_types.is_empty() {
             Ok(Expr::Literal(ScalarValue::List(
                 None,
-                Box::new(DataType::Utf8),
+                Box::new(Field::new("item", DataType::Utf8, true)),
             )))
         } else if data_types.len() > 1 {
             Err(DataFusionError::NotImplemented(format!(
@@ -2513,7 +2513,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
             Ok(Expr::Literal(ScalarValue::List(
                 Some(values),
-                Box::new(data_type),
+                Box::new(Field::new("item", data_type, true)),
             )))
         }
     }