You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/06 07:40:03 UTC

[arrow-datafusion] 01/03: move column, dfschema, etc. to common module

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch datafusion-common-scalar
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit a076a37610640abb36abe7cc026467434a664ef6
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:23:43 2022 +0800

    move column, dfschema, etc. to common module
---
 datafusion-common/Cargo.toml            |    1 +
 datafusion-common/src/column.rs         |    2 +-
 datafusion-common/src/dfschema.rs       |   34 +
 datafusion-common/src/lib.rs            |    6 +-
 datafusion/src/logical_plan/builder.rs  |   48 +-
 datafusion/src/logical_plan/dfschema.rs |  669 +----
 datafusion/src/logical_plan/expr.rs     |  174 +-
 datafusion/src/scalar.rs                | 4190 +++++++++----------------------
 8 files changed, 1206 insertions(+), 3918 deletions(-)

diff --git a/datafusion-common/Cargo.toml b/datafusion-common/Cargo.toml
index 907ddba..9759558 100644
--- a/datafusion-common/Cargo.toml
+++ b/datafusion-common/Cargo.toml
@@ -43,3 +43,4 @@ parquet = { version = "8.0.0", features = ["arrow"] }
 avro-rs = { version = "0.13", features = ["snappy"], optional = true }
 pyo3 = { version = "0.15", optional = true }
 sqlparser = "0.13"
+ordered-float = "2.10"
diff --git a/datafusion-common/src/column.rs b/datafusion-common/src/column.rs
index 5d68916..02faa24 100644
--- a/datafusion-common/src/column.rs
+++ b/datafusion-common/src/column.rs
@@ -75,7 +75,7 @@ impl Column {
     }
 
     // Internal implementation of normalize
-    fn normalize_with_schemas(
+    pub fn normalize_with_schemas(
         self,
         schemas: &[&Arc<DFSchema>],
         using_columns: &[HashSet<Column>],
diff --git a/datafusion-common/src/dfschema.rs b/datafusion-common/src/dfschema.rs
index 453932a..46321c3 100644
--- a/datafusion-common/src/dfschema.rs
+++ b/datafusion-common/src/dfschema.rs
@@ -402,6 +402,40 @@ impl Display for DFSchema {
     }
 }
 
+/// Provides schema information needed by [Expr] methods such as
+/// [Expr::nullable] and [Expr::data_type].
+///
+/// Note that this trait is implemented for &[DFSchema] which is
+/// widely used in the DataFusion codebase.
+pub trait ExprSchema {
+    /// Is this column reference nullable?
+    fn nullable(&self, col: &Column) -> Result<bool>;
+
+    /// What is the datatype of this column?
+    fn data_type(&self, col: &Column) -> Result<&DataType>;
+}
+
+// Implement `ExprSchema` for `Arc<DFSchema>`
+impl<P: AsRef<DFSchema>> ExprSchema for P {
+    fn nullable(&self, col: &Column) -> Result<bool> {
+        self.as_ref().nullable(col)
+    }
+
+    fn data_type(&self, col: &Column) -> Result<&DataType> {
+        self.as_ref().data_type(col)
+    }
+}
+
+impl ExprSchema for DFSchema {
+    fn nullable(&self, col: &Column) -> Result<bool> {
+        Ok(self.field_from_column(col)?.is_nullable())
+    }
+
+    fn data_type(&self, col: &Column) -> Result<&DataType> {
+        Ok(self.field_from_column(col)?.data_type())
+    }
+}
+
 /// DFField wraps an Arrow field and adds an optional qualifier
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct DFField {
diff --git a/datafusion-common/src/lib.rs b/datafusion-common/src/lib.rs
index 8090fc0..e82ff75 100644
--- a/datafusion-common/src/lib.rs
+++ b/datafusion-common/src/lib.rs
@@ -18,7 +18,11 @@
 mod column;
 mod dfschema;
 mod error;
+mod scalar;
 
 pub use column::Column;
-pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
+pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
 pub use error::{DataFusionError, Result};
+pub use scalar::{
+  ScalarType, ScalarValue, MAX_PRECISION_FOR_DECIMAL128, MAX_SCALE_FOR_DECIMAL128,
+};
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 613c8e9..d81fa9d 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -595,6 +595,17 @@ impl LogicalPlanBuilder {
         self.join_detailed(right, join_type, join_keys, false)
     }
 
+    fn normalize(
+        plan: &LogicalPlan,
+        column: impl Into<Column> + Clone,
+    ) -> Result<Column> {
+        let schemas = plan.all_schemas();
+        let using_columns = plan.using_columns()?;
+        column
+            .into()
+            .normalize_with_schemas(&schemas, &using_columns)
+    }
+
     /// Apply a join with on constraint and specified null equality
     /// If null_equals_null is true then null == null, else null != null
     pub fn join_detailed(
@@ -633,7 +644,10 @@ impl LogicalPlanBuilder {
                             match (l_is_left, l_is_right, r_is_left, r_is_right) {
                                 (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
                                 (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
-                                _ => (l.normalize(&self.plan), r.normalize(right)),
+                                _ => (
+                                    Self::normalize(&self.plan, l),
+                                    Self::normalize(right, r),
+                                ),
                             }
                         }
                         (Some(lr), None) => {
@@ -643,9 +657,12 @@ impl LogicalPlanBuilder {
                                 right.schema().field_with_qualified_name(lr, &l.name);
 
                             match (l_is_left, l_is_right) {
-                                (Ok(_), _) => (Ok(l), r.normalize(right)),
-                                (_, Ok(_)) => (r.normalize(&self.plan), Ok(l)),
-                                _ => (l.normalize(&self.plan), r.normalize(right)),
+                                (Ok(_), _) => (Ok(l), Self::normalize(right, r)),
+                                (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
+                                _ => (
+                                    Self::normalize(&self.plan, l),
+                                    Self::normalize(right, r),
+                                ),
                             }
                         }
                         (None, Some(rr)) => {
@@ -655,22 +672,25 @@ impl LogicalPlanBuilder {
                                 right.schema().field_with_qualified_name(rr, &r.name);
 
                             match (r_is_left, r_is_right) {
-                                (Ok(_), _) => (Ok(r), l.normalize(right)),
-                                (_, Ok(_)) => (l.normalize(&self.plan), Ok(r)),
-                                _ => (l.normalize(&self.plan), r.normalize(right)),
+                                (Ok(_), _) => (Ok(r), Self::normalize(right, l)),
+                                (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
+                                _ => (
+                                    Self::normalize(&self.plan, l),
+                                    Self::normalize(right, r),
+                                ),
                             }
                         }
                         (None, None) => {
                             let mut swap = false;
-                            let left_key =
-                                l.clone().normalize(&self.plan).or_else(|_| {
+                            let left_key = Self::normalize(&self.plan, l.clone())
+                                .or_else(|_| {
                                     swap = true;
-                                    l.normalize(right)
+                                    Self::normalize(right, l)
                                 });
                             if swap {
-                                (r.normalize(&self.plan), left_key)
+                                (Self::normalize(&self.plan, r), left_key)
                             } else {
-                                (left_key, r.normalize(right))
+                                (left_key, Self::normalize(right, r))
                             }
                         }
                     }
@@ -705,11 +725,11 @@ impl LogicalPlanBuilder {
         let left_keys: Vec<Column> = using_keys
             .clone()
             .into_iter()
-            .map(|c| c.into().normalize(&self.plan))
+            .map(|c| Self::normalize(&self.plan, c))
             .collect::<Result<_>>()?;
         let right_keys: Vec<Column> = using_keys
             .into_iter()
-            .map(|c| c.into().normalize(right))
+            .map(|c| Self::normalize(right, c))
             .collect::<Result<_>>()?;
 
         let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys.into_iter()).collect();
diff --git a/datafusion/src/logical_plan/dfschema.rs b/datafusion/src/logical_plan/dfschema.rs
index 7b6471f..eb62428 100644
--- a/datafusion/src/logical_plan/dfschema.rs
+++ b/datafusion/src/logical_plan/dfschema.rs
@@ -18,671 +18,4 @@
 //! DFSchema is an extended schema struct that DataFusion uses to provide support for
 //! fields with optional relation names.
 
-use std::collections::HashSet;
-use std::convert::TryFrom;
-use std::sync::Arc;
-
-use crate::error::{DataFusionError, Result};
-use crate::logical_plan::Column;
-
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use std::fmt::{Display, Formatter};
-
-/// A reference-counted reference to a `DFSchema`.
-pub type DFSchemaRef = Arc<DFSchema>;
-
-/// DFSchema wraps an Arrow schema and adds relation names
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct DFSchema {
-    /// Fields
-    fields: Vec<DFField>,
-}
-
-impl DFSchema {
-    /// Creates an empty `DFSchema`
-    pub fn empty() -> Self {
-        Self { fields: vec![] }
-    }
-
-    /// Create a new `DFSchema`
-    pub fn new(fields: Vec<DFField>) -> Result<Self> {
-        let mut qualified_names = HashSet::new();
-        let mut unqualified_names = HashSet::new();
-
-        for field in &fields {
-            if let Some(qualifier) = field.qualifier() {
-                if !qualified_names.insert((qualifier, field.name())) {
-                    return Err(DataFusionError::Plan(format!(
-                        "Schema contains duplicate qualified field name '{}'",
-                        field.qualified_name()
-                    )));
-                }
-            } else if !unqualified_names.insert(field.name()) {
-                return Err(DataFusionError::Plan(format!(
-                    "Schema contains duplicate unqualified field name '{}'",
-                    field.name()
-                )));
-            }
-        }
-
-        // check for mix of qualified and unqualified field with same unqualified name
-        // note that we need to sort the contents of the HashSet first so that errors are
-        // deterministic
-        let mut qualified_names = qualified_names
-            .iter()
-            .map(|(l, r)| (l.to_owned(), r.to_owned()))
-            .collect::<Vec<(&String, &String)>>();
-        qualified_names.sort_by(|a, b| {
-            let a = format!("{}.{}", a.0, a.1);
-            let b = format!("{}.{}", b.0, b.1);
-            a.cmp(&b)
-        });
-        for (qualifier, name) in &qualified_names {
-            if unqualified_names.contains(name) {
-                return Err(DataFusionError::Plan(format!(
-                    "Schema contains qualified field name '{}.{}' \
-                    and unqualified field name '{}' which would be ambiguous",
-                    qualifier, name, name
-                )));
-            }
-        }
-        Ok(Self { fields })
-    }
-
-    /// Create a `DFSchema` from an Arrow schema
-    pub fn try_from_qualified_schema(qualifier: &str, schema: &Schema) -> Result<Self> {
-        Self::new(
-            schema
-                .fields()
-                .iter()
-                .map(|f| DFField::from_qualified(qualifier, f.clone()))
-                .collect(),
-        )
-    }
-
-    /// Combine two schemas
-    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
-        let mut fields = self.fields.clone();
-        fields.extend_from_slice(schema.fields().as_slice());
-        Self::new(fields)
-    }
-
-    /// Merge a schema into self
-    pub fn merge(&mut self, other_schema: &DFSchema) {
-        for field in other_schema.fields() {
-            // skip duplicate columns
-            let duplicated_field = match field.qualifier() {
-                Some(q) => self.field_with_name(Some(q.as_str()), field.name()).is_ok(),
-                // for unqualifed columns, check as unqualified name
-                None => self.field_with_unqualified_name(field.name()).is_ok(),
-            };
-            if !duplicated_field {
-                self.fields.push(field.clone());
-            }
-        }
-    }
-
-    /// Get a list of fields
-    pub fn fields(&self) -> &Vec<DFField> {
-        &self.fields
-    }
-
-    /// Returns an immutable reference of a specific `Field` instance selected using an
-    /// offset within the internal `fields` vector
-    pub fn field(&self, i: usize) -> &DFField {
-        &self.fields[i]
-    }
-
-    /// Find the index of the column with the given unqualified name
-    pub fn index_of(&self, name: &str) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            if self.fields[i].name() == name {
-                return Ok(i);
-            }
-        }
-        Err(DataFusionError::Plan(format!(
-            "No field named '{}'. Valid fields are {}.",
-            name,
-            self.get_field_names()
-        )))
-    }
-
-    fn index_of_column_by_name(
-        &self,
-        qualifier: Option<&str>,
-        name: &str,
-    ) -> Result<usize> {
-        let mut matches = self
-            .fields
-            .iter()
-            .enumerate()
-            .filter(|(_, field)| match (qualifier, &field.qualifier) {
-                // field to lookup is qualified.
-                // current field is qualified and not shared between relations, compare both
-                // qualifier and name.
-                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
-                // field to lookup is qualified but current field is unqualified.
-                (Some(_), None) => false,
-                // field to lookup is unqualified, no need to compare qualifier
-                (None, Some(_)) | (None, None) => field.name() == name,
-            })
-            .map(|(idx, _)| idx);
-        match matches.next() {
-            None => Err(DataFusionError::Plan(format!(
-                "No field named '{}.{}'. Valid fields are {}.",
-                qualifier.unwrap_or("<unqualified>"),
-                name,
-                self.get_field_names()
-            ))),
-            Some(idx) => match matches.next() {
-                None => Ok(idx),
-                // found more than one matches
-                Some(_) => Err(DataFusionError::Internal(format!(
-                    "Ambiguous reference to qualified field named '{}.{}'",
-                    qualifier.unwrap_or("<unqualified>"),
-                    name
-                ))),
-            },
-        }
-    }
-
-    /// Find the index of the column with the given qualifier and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        self.index_of_column_by_name(col.relation.as_deref(), &col.name)
-    }
-
-    /// Find the field with the given name
-    pub fn field_with_name(
-        &self,
-        qualifier: Option<&str>,
-        name: &str,
-    ) -> Result<&DFField> {
-        if let Some(qualifier) = qualifier {
-            self.field_with_qualified_name(qualifier, name)
-        } else {
-            self.field_with_unqualified_name(name)
-        }
-    }
-
-    /// Find all fields match the given name
-    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> {
-        self.fields
-            .iter()
-            .filter(|field| field.name() == name)
-            .collect()
-    }
-
-    /// Find the field with the given name
-    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&DFField> {
-        let matches = self.fields_with_unqualified_name(name);
-        match matches.len() {
-            0 => Err(DataFusionError::Plan(format!(
-                "No field with unqualified name '{}'. Valid fields are {}.",
-                name,
-                self.get_field_names()
-            ))),
-            1 => Ok(matches[0]),
-            _ => Err(DataFusionError::Plan(format!(
-                "Ambiguous reference to field named '{}'",
-                name
-            ))),
-        }
-    }
-
-    /// Find the field with the given qualified name
-    pub fn field_with_qualified_name(
-        &self,
-        qualifier: &str,
-        name: &str,
-    ) -> Result<&DFField> {
-        let idx = self.index_of_column_by_name(Some(qualifier), name)?;
-        Ok(self.field(idx))
-    }
-
-    /// Find the field with the given qualified column
-    pub fn field_from_column(&self, column: &Column) -> Result<&DFField> {
-        match &column.relation {
-            Some(r) => self.field_with_qualified_name(r, &column.name),
-            None => self.field_with_unqualified_name(&column.name),
-        }
-    }
-
-    /// Check to see if unqualified field names matches field names in Arrow schema
-    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
-        self.fields
-            .iter()
-            .zip(arrow_schema.fields().iter())
-            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
-    }
-
-    /// Strip all field qualifier in schema
-    pub fn strip_qualifiers(self) -> Self {
-        DFSchema {
-            fields: self
-                .fields
-                .into_iter()
-                .map(|f| f.strip_qualifier())
-                .collect(),
-        }
-    }
-
-    /// Replace all field qualifier with new value in schema
-    pub fn replace_qualifier(self, qualifier: &str) -> Self {
-        DFSchema {
-            fields: self
-                .fields
-                .into_iter()
-                .map(|f| {
-                    DFField::new(
-                        Some(qualifier),
-                        f.name(),
-                        f.data_type().to_owned(),
-                        f.is_nullable(),
-                    )
-                })
-                .collect(),
-        }
-    }
-
-    /// Get comma-seperated list of field names for use in error messages
-    fn get_field_names(&self) -> String {
-        self.fields
-            .iter()
-            .map(|f| match f.qualifier() {
-                Some(qualifier) => format!("'{}.{}'", qualifier, f.name()),
-                None => format!("'{}'", f.name()),
-            })
-            .collect::<Vec<_>>()
-            .join(", ")
-    }
-}
-
-impl From<DFSchema> for Schema {
-    /// Convert DFSchema into a Schema
-    fn from(df_schema: DFSchema) -> Self {
-        Schema::new(
-            df_schema
-                .fields
-                .into_iter()
-                .map(|f| {
-                    if f.qualifier().is_some() {
-                        Field::new(
-                            f.name().as_str(),
-                            f.data_type().to_owned(),
-                            f.is_nullable(),
-                        )
-                    } else {
-                        f.field
-                    }
-                })
-                .collect(),
-        )
-    }
-}
-
-impl From<&DFSchema> for Schema {
-    /// Convert DFSchema reference into a Schema
-    fn from(df_schema: &DFSchema) -> Self {
-        Schema::new(df_schema.fields.iter().map(|f| f.field.clone()).collect())
-    }
-}
-
-/// Create a `DFSchema` from an Arrow schema
-impl TryFrom<Schema> for DFSchema {
-    type Error = DataFusionError;
-    fn try_from(schema: Schema) -> std::result::Result<Self, Self::Error> {
-        Self::new(
-            schema
-                .fields()
-                .iter()
-                .map(|f| DFField::from(f.clone()))
-                .collect(),
-        )
-    }
-}
-
-impl From<DFSchema> for SchemaRef {
-    fn from(df_schema: DFSchema) -> Self {
-        SchemaRef::new(df_schema.into())
-    }
-}
-
-/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
-pub trait ToDFSchema
-where
-    Self: Sized,
-{
-    /// Attempt to create a DSSchema
-    #[allow(clippy::wrong_self_convention)]
-    fn to_dfschema(self) -> Result<DFSchema>;
-
-    /// Attempt to create a DSSchemaRef
-    #[allow(clippy::wrong_self_convention)]
-    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
-        Ok(Arc::new(self.to_dfschema()?))
-    }
-}
-
-impl ToDFSchema for Schema {
-    #[allow(clippy::wrong_self_convention)]
-    fn to_dfschema(self) -> Result<DFSchema> {
-        DFSchema::try_from(self)
-    }
-}
-
-impl ToDFSchema for SchemaRef {
-    #[allow(clippy::wrong_self_convention)]
-    fn to_dfschema(self) -> Result<DFSchema> {
-        // Attempt to use the Schema directly if there are no other
-        // references, otherwise clone
-        match Self::try_unwrap(self) {
-            Ok(schema) => DFSchema::try_from(schema),
-            Err(schemaref) => DFSchema::try_from(schemaref.as_ref().clone()),
-        }
-    }
-}
-
-impl ToDFSchema for Vec<DFField> {
-    fn to_dfschema(self) -> Result<DFSchema> {
-        DFSchema::new(self)
-    }
-}
-
-impl Display for DFSchema {
-    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
-        write!(
-            f,
-            "{}",
-            self.fields
-                .iter()
-                .map(|field| field.qualified_name())
-                .collect::<Vec<String>>()
-                .join(", ")
-        )
-    }
-}
-
-/// DFField wraps an Arrow field and adds an optional qualifier
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct DFField {
-    /// Optional qualifier (usually a table or relation name)
-    qualifier: Option<String>,
-    /// Arrow field definition
-    field: Field,
-}
-
-impl DFField {
-    /// Creates a new `DFField`
-    pub fn new(
-        qualifier: Option<&str>,
-        name: &str,
-        data_type: DataType,
-        nullable: bool,
-    ) -> Self {
-        DFField {
-            qualifier: qualifier.map(|s| s.to_owned()),
-            field: Field::new(name, data_type, nullable),
-        }
-    }
-
-    /// Create an unqualified field from an existing Arrow field
-    pub fn from(field: Field) -> Self {
-        Self {
-            qualifier: None,
-            field,
-        }
-    }
-
-    /// Create a qualified field from an existing Arrow field
-    pub fn from_qualified(qualifier: &str, field: Field) -> Self {
-        Self {
-            qualifier: Some(qualifier.to_owned()),
-            field,
-        }
-    }
-
-    /// Returns an immutable reference to the `DFField`'s unqualified name
-    pub fn name(&self) -> &String {
-        self.field.name()
-    }
-
-    /// Returns an immutable reference to the `DFField`'s data-type
-    pub fn data_type(&self) -> &DataType {
-        self.field.data_type()
-    }
-
-    /// Indicates whether this `DFField` supports null values
-    pub fn is_nullable(&self) -> bool {
-        self.field.is_nullable()
-    }
-
-    /// Returns a string to the `DFField`'s qualified name
-    pub fn qualified_name(&self) -> String {
-        if let Some(qualifier) = &self.qualifier {
-            format!("{}.{}", qualifier, self.field.name())
-        } else {
-            self.field.name().to_owned()
-        }
-    }
-
-    /// Builds a qualified column based on self
-    pub fn qualified_column(&self) -> Column {
-        Column {
-            relation: self.qualifier.clone(),
-            name: self.field.name().to_string(),
-        }
-    }
-
-    /// Builds an unqualified column based on self
-    pub fn unqualified_column(&self) -> Column {
-        Column {
-            relation: None,
-            name: self.field.name().to_string(),
-        }
-    }
-
-    /// Get the optional qualifier
-    pub fn qualifier(&self) -> Option<&String> {
-        self.qualifier.as_ref()
-    }
-
-    /// Get the arrow field
-    pub fn field(&self) -> &Field {
-        &self.field
-    }
-
-    /// Return field with qualifier stripped
-    pub fn strip_qualifier(mut self) -> Self {
-        self.qualifier = None;
-        self
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use arrow::datatypes::DataType;
-
-    #[test]
-    fn from_unqualified_field() {
-        let field = Field::new("c0", DataType::Boolean, true);
-        let field = DFField::from(field);
-        assert_eq!("c0", field.name());
-        assert_eq!("c0", field.qualified_name());
-    }
-
-    #[test]
-    fn from_qualified_field() {
-        let field = Field::new("c0", DataType::Boolean, true);
-        let field = DFField::from_qualified("t1", field);
-        assert_eq!("c0", field.name());
-        assert_eq!("t1.c0", field.qualified_name());
-    }
-
-    #[test]
-    fn from_unqualified_schema() -> Result<()> {
-        let schema = DFSchema::try_from(test_schema_1())?;
-        assert_eq!("c0, c1", schema.to_string());
-        Ok(())
-    }
-
-    #[test]
-    fn from_qualified_schema() -> Result<()> {
-        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        assert_eq!("t1.c0, t1.c1", schema.to_string());
-        Ok(())
-    }
-
-    #[test]
-    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
-        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let arrow_schema: Schema = schema.into();
-        let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, \
-        Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }";
-        assert_eq!(expected, arrow_schema.to_string());
-        Ok(())
-    }
-
-    #[test]
-    fn join_qualified() -> Result<()> {
-        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
-        let join = left.join(&right)?;
-        assert_eq!("t1.c0, t1.c1, t2.c0, t2.c1", join.to_string());
-        // test valid access
-        assert!(join.field_with_qualified_name("t1", "c0").is_ok());
-        assert!(join.field_with_qualified_name("t2", "c0").is_ok());
-        // test invalid access
-        assert!(join.field_with_unqualified_name("c0").is_err());
-        assert!(join.field_with_unqualified_name("t1.c0").is_err());
-        assert!(join.field_with_unqualified_name("t2.c0").is_err());
-        Ok(())
-    }
-
-    #[test]
-    fn join_qualified_duplicate() -> Result<()> {
-        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let join = left.join(&right);
-        assert!(join.is_err());
-        assert_eq!(
-            "Error during planning: Schema contains duplicate \
-        qualified field name \'t1.c0\'",
-            &format!("{}", join.err().unwrap())
-        );
-        Ok(())
-    }
-
-    #[test]
-    fn join_unqualified_duplicate() -> Result<()> {
-        let left = DFSchema::try_from(test_schema_1())?;
-        let right = DFSchema::try_from(test_schema_1())?;
-        let join = left.join(&right);
-        assert!(join.is_err());
-        assert_eq!(
-            "Error during planning: Schema contains duplicate \
-        unqualified field name \'c0\'",
-            &format!("{}", join.err().unwrap())
-        );
-        Ok(())
-    }
-
-    #[test]
-    fn join_mixed() -> Result<()> {
-        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let right = DFSchema::try_from(test_schema_2())?;
-        let join = left.join(&right)?;
-        assert_eq!("t1.c0, t1.c1, c100, c101", join.to_string());
-        // test valid access
-        assert!(join.field_with_qualified_name("t1", "c0").is_ok());
-        assert!(join.field_with_unqualified_name("c0").is_ok());
-        assert!(join.field_with_unqualified_name("c100").is_ok());
-        assert!(join.field_with_name(None, "c100").is_ok());
-        // test invalid access
-        assert!(join.field_with_unqualified_name("t1.c0").is_err());
-        assert!(join.field_with_unqualified_name("t1.c100").is_err());
-        assert!(join.field_with_qualified_name("", "c100").is_err());
-        Ok(())
-    }
-
-    #[test]
-    fn join_mixed_duplicate() -> Result<()> {
-        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let right = DFSchema::try_from(test_schema_1())?;
-        let join = left.join(&right);
-        assert!(join.is_err());
-        assert_eq!(
-            "Error during planning: Schema contains qualified \
-        field name \'t1.c0\' and unqualified field name \'c0\' which would be ambiguous",
-            &format!("{}", join.err().unwrap())
-        );
-        Ok(())
-    }
-
-    #[test]
-    fn helpful_error_messages() -> Result<()> {
-        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let expected_help = "Valid fields are \'t1.c0\', \'t1.c1\'.";
-        assert!(schema
-            .field_with_qualified_name("x", "y")
-            .unwrap_err()
-            .to_string()
-            .contains(expected_help));
-        assert!(schema
-            .field_with_unqualified_name("y")
-            .unwrap_err()
-            .to_string()
-            .contains(expected_help));
-        assert!(schema
-            .index_of("y")
-            .unwrap_err()
-            .to_string()
-            .contains(expected_help));
-        Ok(())
-    }
-
-    #[test]
-    fn into() {
-        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
-        let arrow_schema = Schema::new(vec![Field::new("c0", DataType::Int64, true)]);
-        let arrow_schema_ref = Arc::new(arrow_schema.clone());
-
-        let df_schema =
-            DFSchema::new(vec![DFField::new(None, "c0", DataType::Int64, true)]).unwrap();
-        let df_schema_ref = Arc::new(df_schema.clone());
-
-        {
-            let arrow_schema = arrow_schema.clone();
-            let arrow_schema_ref = arrow_schema_ref.clone();
-
-            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
-            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
-        }
-
-        {
-            let arrow_schema = arrow_schema.clone();
-            let arrow_schema_ref = arrow_schema_ref.clone();
-
-            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
-            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
-        }
-
-        // Now, consume the refs
-        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
-        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
-    }
-
-    fn test_schema_1() -> Schema {
-        Schema::new(vec![
-            Field::new("c0", DataType::Boolean, true),
-            Field::new("c1", DataType::Boolean, true),
-        ])
-    }
-
-    fn test_schema_2() -> Schema {
-        Schema::new(vec![
-            Field::new("c100", DataType::Boolean, true),
-            Field::new("c101", DataType::Boolean, true),
-        ])
-    }
-}
+pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index 300c751..fba7d81 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -34,152 +34,14 @@ use crate::physical_plan::{
 use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
 use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
 use arrow::{compute::can_cast_types, datatypes::DataType};
+pub use datafusion_common::{Column, ExprSchema};
 use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature};
 use std::collections::{HashMap, HashSet};
-use std::convert::Infallible;
 use std::fmt;
 use std::hash::{BuildHasher, Hash, Hasher};
 use std::ops::Not;
-use std::str::FromStr;
 use std::sync::Arc;
 
-/// A named reference to a qualified field in a schema.
-#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
-pub struct Column {
-    /// relation/table name.
-    pub relation: Option<String>,
-    /// field/column name.
-    pub name: String,
-}
-
-impl Column {
-    /// Create Column from unqualified name.
-    pub fn from_name(name: impl Into<String>) -> Self {
-        Self {
-            relation: None,
-            name: name.into(),
-        }
-    }
-
-    /// Deserialize a fully qualified name string into a column
-    pub fn from_qualified_name(flat_name: &str) -> Self {
-        use sqlparser::tokenizer::Token;
-
-        let dialect = sqlparser::dialect::GenericDialect {};
-        let mut tokenizer = sqlparser::tokenizer::Tokenizer::new(&dialect, flat_name);
-        if let Ok(tokens) = tokenizer.tokenize() {
-            if let [Token::Word(relation), Token::Period, Token::Word(name)] =
-                tokens.as_slice()
-            {
-                return Column {
-                    relation: Some(relation.value.clone()),
-                    name: name.value.clone(),
-                };
-            }
-        }
-        // any expression that's not in the form of `foo.bar` will be treated as unqualified column
-        // name
-        Column {
-            relation: None,
-            name: String::from(flat_name),
-        }
-    }
-
-    /// Serialize column into a flat name string
-    pub fn flat_name(&self) -> String {
-        match &self.relation {
-            Some(r) => format!("{}.{}", r, self.name),
-            None => self.name.clone(),
-        }
-    }
-
-    /// Normalizes `self` if is unqualified (has no relation name)
-    /// with an explicit qualifier from the first matching input
-    /// schemas.
-    ///
-    /// For example, `foo` will be normalized to `t.foo` if there is a
-    /// column named `foo` in a relation named `t` found in `schemas`
-    pub fn normalize(self, plan: &LogicalPlan) -> Result<Self> {
-        let schemas = plan.all_schemas();
-        let using_columns = plan.using_columns()?;
-        self.normalize_with_schemas(&schemas, &using_columns)
-    }
-
-    // Internal implementation of normalize
-    fn normalize_with_schemas(
-        self,
-        schemas: &[&Arc<DFSchema>],
-        using_columns: &[HashSet<Column>],
-    ) -> Result<Self> {
-        if self.relation.is_some() {
-            return Ok(self);
-        }
-
-        for schema in schemas {
-            let fields = schema.fields_with_unqualified_name(&self.name);
-            match fields.len() {
-                0 => continue,
-                1 => {
-                    return Ok(fields[0].qualified_column());
-                }
-                _ => {
-                    // More than 1 fields in this schema have their names set to self.name.
-                    //
-                    // This should only happen when a JOIN query with USING constraint references
-                    // join columns using unqualified column name. For example:
-                    //
-                    // ```sql
-                    // SELECT id FROM t1 JOIN t2 USING(id)
-                    // ```
-                    //
-                    // In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
-                    // We will use the relation from the first matched field to normalize self.
-
-                    // Compare matched fields with one USING JOIN clause at a time
-                    for using_col in using_columns {
-                        let all_matched = fields
-                            .iter()
-                            .all(|f| using_col.contains(&f.qualified_column()));
-                        // All matched fields belong to the same using column set, in orther words
-                        // the same join clause. We simply pick the qualifer from the first match.
-                        if all_matched {
-                            return Ok(fields[0].qualified_column());
-                        }
-                    }
-                }
-            }
-        }
-
-        Err(DataFusionError::Plan(format!(
-            "Column {} not found in provided schemas",
-            self
-        )))
-    }
-}
-
-impl From<&str> for Column {
-    fn from(c: &str) -> Self {
-        Self::from_qualified_name(c)
-    }
-}
-
-impl FromStr for Column {
-    type Err = Infallible;
-
-    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
-        Ok(s.into())
-    }
-}
-
-impl fmt::Display for Column {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match &self.relation {
-            Some(r) => write!(f, "#{}.{}", r, self.name),
-            None => write!(f, "#{}", self.name),
-        }
-    }
-}
-
 /// `Expr` is a central struct of DataFusion's query API, and
 /// represent logical expressions such as `A + 1`, or `CAST(c1 AS
 /// int)`.
@@ -392,40 +254,6 @@ impl PartialOrd for Expr {
     }
 }
 
-/// Provides schema information needed by [Expr] methods such as
-/// [Expr::nullable] and [Expr::data_type].
-///
-/// Note that this trait is implemented for &[DFSchema] which is
-/// widely used in the DataFusion codebase.
-pub trait ExprSchema {
-    /// Is this column reference nullable?
-    fn nullable(&self, col: &Column) -> Result<bool>;
-
-    /// What is the datatype of this column?
-    fn data_type(&self, col: &Column) -> Result<&DataType>;
-}
-
-// Implement `ExprSchema` for `Arc<DFSchema>`
-impl<P: AsRef<DFSchema>> ExprSchema for P {
-    fn nullable(&self, col: &Column) -> Result<bool> {
-        self.as_ref().nullable(col)
-    }
-
-    fn data_type(&self, col: &Column) -> Result<&DataType> {
-        self.as_ref().data_type(col)
-    }
-}
-
-impl ExprSchema for DFSchema {
-    fn nullable(&self, col: &Column) -> Result<bool> {
-        Ok(self.field_from_column(col)?.is_nullable())
-    }
-
-    fn data_type(&self, col: &Column) -> Result<&DataType> {
-        Ok(self.field_from_column(col)?.data_type())
-    }
-}
-
 impl Expr {
     /// Returns the [arrow::datatypes::DataType] of the expression
     /// based on [ExprSchema]
diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs
index 0f6a5ce..dc4c76e 100644
--- a/datafusion/src/scalar.rs
+++ b/datafusion/src/scalar.rs
@@ -15,3114 +15,1182 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! This module provides ScalarValue, an enum that can be used for storage of single elements
+//! ScalarValue reimported from datafusion-common
 
-use crate::error::{DataFusionError, Result};
-use arrow::{
+pub use datafusion_common::{
+  ScalarType, ScalarValue, MAX_PRECISION_FOR_DECIMAL128, MAX_SCALE_FOR_DECIMAL128,
+};
+
+#[cfg(test)]
+mod tests {
+  use super::*;
+  use crate::from_slice::FromSlice;
+  use arrow::{
     array::*,
     compute::kernels::cast::cast,
     datatypes::{
-        ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Float32Type,
-        Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalUnit, TimeUnit,
-        TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
-        TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
+      ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Float32Type, Float64Type,
+      Int16Type, Int32Type, Int64Type, Int8Type, IntervalUnit, TimeUnit,
+      TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
+      TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
     },
-};
-use ordered_float::OrderedFloat;
-use std::cmp::Ordering;
-use std::convert::{Infallible, TryInto};
-use std::str::FromStr;
-use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc};
-
-// TODO may need to be moved to arrow-rs
-/// The max precision and scale for decimal128
-pub(crate) const MAX_PRECISION_FOR_DECIMAL128: usize = 38;
-pub(crate) const MAX_SCALE_FOR_DECIMAL128: usize = 38;
-
-/// Represents a dynamically typed, nullable single value.
-/// This is the single-valued counter-part of arrow’s `Array`.
-#[derive(Clone)]
-pub enum ScalarValue {
-    /// true or false value
-    Boolean(Option<bool>),
-    /// 32bit float
-    Float32(Option<f32>),
-    /// 64bit float
-    Float64(Option<f64>),
-    /// 128bit decimal, using the i128 to represent the decimal
-    Decimal128(Option<i128>, usize, usize),
-    /// signed 8bit int
-    Int8(Option<i8>),
-    /// signed 16bit int
-    Int16(Option<i16>),
-    /// signed 32bit int
-    Int32(Option<i32>),
-    /// signed 64bit int
-    Int64(Option<i64>),
-    /// unsigned 8bit int
-    UInt8(Option<u8>),
-    /// unsigned 16bit int
-    UInt16(Option<u16>),
-    /// unsigned 32bit int
-    UInt32(Option<u32>),
-    /// unsigned 64bit int
-    UInt64(Option<u64>),
-    /// utf-8 encoded string.
-    Utf8(Option<String>),
-    /// utf-8 encoded string representing a LargeString's arrow type.
-    LargeUtf8(Option<String>),
-    /// binary
-    Binary(Option<Vec<u8>>),
-    /// large binary
-    LargeBinary(Option<Vec<u8>>),
-    /// list of nested ScalarValue (boxed to reduce size_of(ScalarValue))
-    #[allow(clippy::box_collection)]
-    List(Option<Box<Vec<ScalarValue>>>, Box<DataType>),
-    /// Date stored as a signed 32bit int
-    Date32(Option<i32>),
-    /// Date stored as a signed 64bit int
-    Date64(Option<i64>),
-    /// Timestamp Second
-    TimestampSecond(Option<i64>, Option<String>),
-    /// Timestamp Milliseconds
-    TimestampMillisecond(Option<i64>, Option<String>),
-    /// Timestamp Microseconds
-    TimestampMicrosecond(Option<i64>, Option<String>),
-    /// Timestamp Nanoseconds
-    TimestampNanosecond(Option<i64>, Option<String>),
-    /// Interval with YearMonth unit
-    IntervalYearMonth(Option<i32>),
-    /// Interval with DayTime unit
-    IntervalDayTime(Option<i64>),
-    /// Interval with MonthDayNano unit
-    IntervalMonthDayNano(Option<i128>),
-    /// struct of nested ScalarValue (boxed to reduce size_of(ScalarValue))
-    #[allow(clippy::box_collection)]
-    Struct(Option<Box<Vec<ScalarValue>>>, Box<Vec<Field>>),
-}
-
-// manual implementation of `PartialEq` that uses OrderedFloat to
-// get defined behavior for floating point
-impl PartialEq for ScalarValue {
-    fn eq(&self, other: &Self) -> bool {
-        use ScalarValue::*;
-        // This purposely doesn't have a catch-all "(_, _)" so that
-        // any newly added enum variant will require editing this list
-        // or else face a compile error
-        match (self, other) {
-            (Decimal128(v1, p1, s1), Decimal128(v2, p2, s2)) => {
-                v1.eq(v2) && p1.eq(p2) && s1.eq(s2)
-            }
-            (Decimal128(_, _, _), _) => false,
-            (Boolean(v1), Boolean(v2)) => v1.eq(v2),
-            (Boolean(_), _) => false,
-            (Float32(v1), Float32(v2)) => {
-                let v1 = v1.map(OrderedFloat);
-                let v2 = v2.map(OrderedFloat);
-                v1.eq(&v2)
-            }
-            (Float32(_), _) => false,
-            (Float64(v1), Float64(v2)) => {
-                let v1 = v1.map(OrderedFloat);
-                let v2 = v2.map(OrderedFloat);
-                v1.eq(&v2)
-            }
-            (Float64(_), _) => false,
-            (Int8(v1), Int8(v2)) => v1.eq(v2),
-            (Int8(_), _) => false,
-            (Int16(v1), Int16(v2)) => v1.eq(v2),
-            (Int16(_), _) => false,
-            (Int32(v1), Int32(v2)) => v1.eq(v2),
-            (Int32(_), _) => false,
-            (Int64(v1), Int64(v2)) => v1.eq(v2),
-            (Int64(_), _) => false,
-            (UInt8(v1), UInt8(v2)) => v1.eq(v2),
-            (UInt8(_), _) => false,
-            (UInt16(v1), UInt16(v2)) => v1.eq(v2),
-            (UInt16(_), _) => false,
-            (UInt32(v1), UInt32(v2)) => v1.eq(v2),
-            (UInt32(_), _) => false,
-            (UInt64(v1), UInt64(v2)) => v1.eq(v2),
-            (UInt64(_), _) => false,
-            (Utf8(v1), Utf8(v2)) => v1.eq(v2),
-            (Utf8(_), _) => false,
-            (LargeUtf8(v1), LargeUtf8(v2)) => v1.eq(v2),
-            (LargeUtf8(_), _) => false,
-            (Binary(v1), Binary(v2)) => v1.eq(v2),
-            (Binary(_), _) => false,
-            (LargeBinary(v1), LargeBinary(v2)) => v1.eq(v2),
-            (LargeBinary(_), _) => false,
-            (List(v1, t1), List(v2, t2)) => v1.eq(v2) && t1.eq(t2),
-            (List(_, _), _) => false,
-            (Date32(v1), Date32(v2)) => v1.eq(v2),
-            (Date32(_), _) => false,
-            (Date64(v1), Date64(v2)) => v1.eq(v2),
-            (Date64(_), _) => false,
-            (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.eq(v2),
-            (TimestampSecond(_, _), _) => false,
-            (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => v1.eq(v2),
-            (TimestampMillisecond(_, _), _) => false,
-            (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => v1.eq(v2),
-            (TimestampMicrosecond(_, _), _) => false,
-            (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => v1.eq(v2),
-            (TimestampNanosecond(_, _), _) => false,
-            (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.eq(v2),
-            (IntervalYearMonth(_), _) => false,
-            (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.eq(v2),
-            (IntervalDayTime(_), _) => false,
-            (IntervalMonthDayNano(v1), IntervalMonthDayNano(v2)) => v1.eq(v2),
-            (IntervalMonthDayNano(_), _) => false,
-            (Struct(v1, t1), Struct(v2, t2)) => v1.eq(v2) && t1.eq(t2),
-            (Struct(_, _), _) => false,
-        }
-    }
-}
-
-// manual implementation of `PartialOrd` that uses OrderedFloat to
-// get defined behavior for floating point
-impl PartialOrd for ScalarValue {
-    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
-        use ScalarValue::*;
-        // This purposely doesn't have a catch-all "(_, _)" so that
-        // any newly added enum variant will require editing this list
-        // or else face a compile error
-        match (self, other) {
-            (Decimal128(v1, p1, s1), Decimal128(v2, p2, s2)) => {
-                if p1.eq(p2) && s1.eq(s2) {
-                    v1.partial_cmp(v2)
-                } else {
-                    // Two decimal values can be compared if they have the same precision and scale.
-                    None
-                }
-            }
-            (Decimal128(_, _, _), _) => None,
-            (Boolean(v1), Boolean(v2)) => v1.partial_cmp(v2),
-            (Boolean(_), _) => None,
-            (Float32(v1), Float32(v2)) => {
-                let v1 = v1.map(OrderedFloat);
-                let v2 = v2.map(OrderedFloat);
-                v1.partial_cmp(&v2)
-            }
-            (Float32(_), _) => None,
-            (Float64(v1), Float64(v2)) => {
-                let v1 = v1.map(OrderedFloat);
-                let v2 = v2.map(OrderedFloat);
-                v1.partial_cmp(&v2)
-            }
-            (Float64(_), _) => None,
-            (Int8(v1), Int8(v2)) => v1.partial_cmp(v2),
-            (Int8(_), _) => None,
-            (Int16(v1), Int16(v2)) => v1.partial_cmp(v2),
-            (Int16(_), _) => None,
-            (Int32(v1), Int32(v2)) => v1.partial_cmp(v2),
-            (Int32(_), _) => None,
-            (Int64(v1), Int64(v2)) => v1.partial_cmp(v2),
-            (Int64(_), _) => None,
-            (UInt8(v1), UInt8(v2)) => v1.partial_cmp(v2),
-            (UInt8(_), _) => None,
-            (UInt16(v1), UInt16(v2)) => v1.partial_cmp(v2),
-            (UInt16(_), _) => None,
-            (UInt32(v1), UInt32(v2)) => v1.partial_cmp(v2),
-            (UInt32(_), _) => None,
-            (UInt64(v1), UInt64(v2)) => v1.partial_cmp(v2),
-            (UInt64(_), _) => None,
-            (Utf8(v1), Utf8(v2)) => v1.partial_cmp(v2),
-            (Utf8(_), _) => None,
-            (LargeUtf8(v1), LargeUtf8(v2)) => v1.partial_cmp(v2),
-            (LargeUtf8(_), _) => None,
-            (Binary(v1), Binary(v2)) => v1.partial_cmp(v2),
-            (Binary(_), _) => None,
-            (LargeBinary(v1), LargeBinary(v2)) => v1.partial_cmp(v2),
-            (LargeBinary(_), _) => None,
-            (List(v1, t1), List(v2, t2)) => {
-                if t1.eq(t2) {
-                    v1.partial_cmp(v2)
-                } else {
-                    None
-                }
-            }
-            (List(_, _), _) => None,
-            (Date32(v1), Date32(v2)) => v1.partial_cmp(v2),
-            (Date32(_), _) => None,
-            (Date64(v1), Date64(v2)) => v1.partial_cmp(v2),
-            (Date64(_), _) => None,
-            (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.partial_cmp(v2),
-            (TimestampSecond(_, _), _) => None,
-            (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => {
-                v1.partial_cmp(v2)
-            }
-            (TimestampMillisecond(_, _), _) => None,
-            (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => {
-                v1.partial_cmp(v2)
-            }
-            (TimestampMicrosecond(_, _), _) => None,
-            (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => {
-                v1.partial_cmp(v2)
-            }
-            (TimestampNanosecond(_, _), _) => None,
-            (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.partial_cmp(v2),
-            (IntervalYearMonth(_), _) => None,
-            (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.partial_cmp(v2),
-            (IntervalDayTime(_), _) => None,
-            (IntervalMonthDayNano(v1), IntervalMonthDayNano(v2)) => v1.partial_cmp(v2),
-            (IntervalMonthDayNano(_), _) => None,
-            (Struct(v1, t1), Struct(v2, t2)) => {
-                if t1.eq(t2) {
-                    v1.partial_cmp(v2)
-                } else {
-                    None
-                }
-            }
-            (Struct(_, _), _) => None,
-        }
-    }
-}
-
-impl Eq for ScalarValue {}
-
-// manual implementation of `Hash` that uses OrderedFloat to
-// get defined behavior for floating point
-impl std::hash::Hash for ScalarValue {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        use ScalarValue::*;
-        match self {
-            Decimal128(v, p, s) => {
-                v.hash(state);
-                p.hash(state);
-                s.hash(state)
-            }
-            Boolean(v) => v.hash(state),
-            Float32(v) => {
-                let v = v.map(OrderedFloat);
-                v.hash(state)
-            }
-            Float64(v) => {
-                let v = v.map(OrderedFloat);
-                v.hash(state)
-            }
-            Int8(v) => v.hash(state),
-            Int16(v) => v.hash(state),
-            Int32(v) => v.hash(state),
-            Int64(v) => v.hash(state),
-            UInt8(v) => v.hash(state),
-            UInt16(v) => v.hash(state),
-            UInt32(v) => v.hash(state),
-            UInt64(v) => v.hash(state),
-            Utf8(v) => v.hash(state),
-            LargeUtf8(v) => v.hash(state),
-            Binary(v) => v.hash(state),
-            LargeBinary(v) => v.hash(state),
-            List(v, t) => {
-                v.hash(state);
-                t.hash(state);
-            }
-            Date32(v) => v.hash(state),
-            Date64(v) => v.hash(state),
-            TimestampSecond(v, _) => v.hash(state),
-            TimestampMillisecond(v, _) => v.hash(state),
-            TimestampMicrosecond(v, _) => v.hash(state),
-            TimestampNanosecond(v, _) => v.hash(state),
-            IntervalYearMonth(v) => v.hash(state),
-            IntervalDayTime(v) => v.hash(state),
-            IntervalMonthDayNano(v) => v.hash(state),
-            Struct(v, t) => {
-                v.hash(state);
-                t.hash(state);
-            }
-        }
-    }
-}
-
-// return the index into the dictionary values for array@index as well
-// as a reference to the dictionary values array. Returns None for the
-// index if the array is NULL at index
-#[inline]
-fn get_dict_value<K: ArrowDictionaryKeyType>(
-    array: &ArrayRef,
-    index: usize,
-) -> Result<(&ArrayRef, Option<usize>)> {
-    let dict_array = array.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
-
-    // look up the index in the values dictionary
-    let keys_col = dict_array.keys();
-    if !keys_col.is_valid(index) {
-        return Ok((dict_array.values(), None));
+  };
+  use std::cmp::Ordering;
+  use std::sync::Arc;
+
+  #[test]
+  fn scalar_decimal_test() {
+    let decimal_value = ScalarValue::Decimal128(Some(123), 10, 1);
+    assert_eq!(DataType::Decimal(10, 1), decimal_value.get_datatype());
+    let try_into_value: i128 = decimal_value.clone().try_into().unwrap();
+    assert_eq!(123_i128, try_into_value);
+    assert!(!decimal_value.is_null());
+    let neg_decimal_value = decimal_value.arithmetic_negate();
+    match neg_decimal_value {
+      ScalarValue::Decimal128(v, _, _) => {
+        assert_eq!(-123, v.unwrap());
+      }
+      _ => {
+        unreachable!();
+      }
     }
-    let values_index = keys_col.value(index).to_usize().ok_or_else(|| {
-        DataFusionError::Internal(format!(
-            "Can not convert index to usize in dictionary of type creating group by value {:?}",
-            keys_col.data_type()
-        ))
-    })?;
-
-    Ok((dict_array.values(), Some(values_index)))
-}
 
-macro_rules! typed_cast_tz {
-    ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident, $TZ:expr) => {{
-        let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
-        ScalarValue::$SCALAR(
-            match array.is_null($index) {
-                true => None,
-                false => Some(array.value($index).into()),
-            },
-            $TZ.clone(),
-        )
+    // decimal scalar to array
+    let array = decimal_value.to_array();
+    let array = array.as_any().downcast_ref::<DecimalArray>().unwrap();
+    assert_eq!(1, array.len());
+    assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
+    assert_eq!(123i128, array.value(0));
+
+    // decimal scalar to array with size
+    let array = decimal_value.to_array_of_size(10);
+    let array_decimal = array.as_any().downcast_ref::<DecimalArray>().unwrap();
+    assert_eq!(10, array.len());
+    assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
+    assert_eq!(123i128, array_decimal.value(0));
+    assert_eq!(123i128, array_decimal.value(9));
+    // test eq array
+    assert!(decimal_value.eq_array(&array, 1));
+    assert!(decimal_value.eq_array(&array, 5));
+    // test try from array
+    assert_eq!(
+      decimal_value,
+      ScalarValue::try_from_array(&array, 5).unwrap()
+    );
+
+    assert_eq!(
+      decimal_value,
+      ScalarValue::try_new_decimal128(123, 10, 1).unwrap()
+    );
+
+    // test compare
+    let left = ScalarValue::Decimal128(Some(123), 10, 2);
+    let right = ScalarValue::Decimal128(Some(124), 10, 2);
+    assert!(!left.eq(&right));
+    let result = left < right;
+    assert!(result);
+    let result = left <= right;
+    assert!(result);
+    let right = ScalarValue::Decimal128(Some(124), 10, 3);
+    // make sure that two decimals with diff datatype can't be compared.
+    let result = left.partial_cmp(&right);
+    assert_eq!(None, result);
+
+    let decimal_vec = vec![
+      ScalarValue::Decimal128(Some(1), 10, 2),
+      ScalarValue::Decimal128(Some(2), 10, 2),
+      ScalarValue::Decimal128(Some(3), 10, 2),
+    ];
+    // convert the vec to decimal array and check the result
+    let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap();
+    assert_eq!(3, array.len());
+    assert_eq!(DataType::Decimal(10, 2), array.data_type().clone());
+
+    let decimal_vec = vec![
+      ScalarValue::Decimal128(Some(1), 10, 2),
+      ScalarValue::Decimal128(Some(2), 10, 2),
+      ScalarValue::Decimal128(Some(3), 10, 2),
+      ScalarValue::Decimal128(None, 10, 2),
+    ];
+    let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap();
+    assert_eq!(4, array.len());
+    assert_eq!(DataType::Decimal(10, 2), array.data_type().clone());
+
+    assert!(ScalarValue::try_new_decimal128(1, 10, 2)
+      .unwrap()
+      .eq_array(&array, 0));
+    assert!(ScalarValue::try_new_decimal128(2, 10, 2)
+      .unwrap()
+      .eq_array(&array, 1));
+    assert!(ScalarValue::try_new_decimal128(3, 10, 2)
+      .unwrap()
+      .eq_array(&array, 2));
+    assert_eq!(
+      ScalarValue::Decimal128(None, 10, 2),
+      ScalarValue::try_from_array(&array, 3).unwrap()
+    );
+    assert_eq!(
+      ScalarValue::Decimal128(None, 10, 2),
+      ScalarValue::try_from_array(&array, 4).unwrap()
+    );
+  }
+
+  #[test]
+  fn scalar_value_to_array_u64() {
+    let value = ScalarValue::UInt64(Some(13u64));
+    let array = value.to_array();
+    let array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
+    assert_eq!(array.len(), 1);
+    assert!(!array.is_null(0));
+    assert_eq!(array.value(0), 13);
+
+    let value = ScalarValue::UInt64(None);
+    let array = value.to_array();
+    let array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
+    assert_eq!(array.len(), 1);
+    assert!(array.is_null(0));
+  }
+
+  #[test]
+  fn scalar_value_to_array_u32() {
+    let value = ScalarValue::UInt32(Some(13u32));
+    let array = value.to_array();
+    let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
+    assert_eq!(array.len(), 1);
+    assert!(!array.is_null(0));
+    assert_eq!(array.value(0), 13);
+
+    let value = ScalarValue::UInt32(None);
+    let array = value.to_array();
+    let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
+    assert_eq!(array.len(), 1);
+    assert!(array.is_null(0));
+  }
+
+  #[test]
+  fn scalar_list_null_to_array() {
+    let list_array_ref = ScalarValue::List(None, Box::new(DataType::UInt64)).to_array();
+    let list_array = list_array_ref.as_any().downcast_ref::<ListArray>().unwrap();
+
+    assert!(list_array.is_null(0));
+    assert_eq!(list_array.len(), 1);
+    assert_eq!(list_array.values().len(), 0);
+  }
+
+  #[test]
+  fn scalar_list_to_array() {
+    let list_array_ref = ScalarValue::List(
+      Some(Box::new(vec![
+        ScalarValue::UInt64(Some(100)),
+        ScalarValue::UInt64(None),
+        ScalarValue::UInt64(Some(101)),
+      ])),
+      Box::new(DataType::UInt64),
+    )
+    .to_array();
+
+    let list_array = list_array_ref.as_any().downcast_ref::<ListArray>().unwrap();
+    assert_eq!(list_array.len(), 1);
+    assert_eq!(list_array.values().len(), 3);
+
+    let prim_array_ref = list_array.value(0);
+    let prim_array = prim_array_ref
+      .as_any()
+      .downcast_ref::<UInt64Array>()
+      .unwrap();
+    assert_eq!(prim_array.len(), 3);
+    assert_eq!(prim_array.value(0), 100);
+    assert!(prim_array.is_null(1));
+    assert_eq!(prim_array.value(2), 101);
+  }
+
+  /// Creates array directly and via ScalarValue and ensures they are the same
+  macro_rules! check_scalar_iter {
+    ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{
+      let scalars: Vec<_> = $INPUT.iter().map(|v| ScalarValue::$SCALAR_T(*v)).collect();
+
+      let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();
+
+      let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT));
+
+      assert_eq!(&array, &expected);
     }};
-}
+  }
 
-macro_rules! typed_cast {
-    ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident) => {{
-        let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
-        ScalarValue::$SCALAR(match array.is_null($index) {
-            true => None,
-            false => Some(array.value($index).into()),
-        })
-    }};
-}
+  /// Creates array directly and via ScalarValue and ensures they are the same
+  /// but for variants that carry a timezone field.
+  macro_rules! check_scalar_iter_tz {
+    ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{
+      let scalars: Vec<_> = $INPUT
+        .iter()
+        .map(|v| ScalarValue::$SCALAR_T(*v, None))
+        .collect();
 
-macro_rules! build_list {
-    ($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
-        match $VALUES {
-            // the return on the macro is necessary, to short-circuit and return ArrayRef
-            None => {
-                return new_null_array(
-                    &DataType::List(Box::new(Field::new(
-                        "item",
-                        DataType::$SCALAR_TY,
-                        true,
-                    ))),
-                    $SIZE,
-                )
-            }
-            Some(values) => {
-                build_values_list!($VALUE_BUILDER_TY, $SCALAR_TY, values.as_ref(), $SIZE)
-            }
-        }
-    }};
-}
+      let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();
 
-macro_rules! build_timestamp_list {
-    ($TIME_UNIT:expr, $TIME_ZONE:expr, $VALUES:expr, $SIZE:expr) => {{
-        match $VALUES {
-            // the return on the macro is necessary, to short-circuit and return ArrayRef
-            None => {
-                return new_null_array(
-                    &DataType::List(Box::new(Field::new(
-                        "item",
-                        DataType::Timestamp($TIME_UNIT, $TIME_ZONE),
-                        true,
-                    ))),
-                    $SIZE,
-                )
-            }
-            Some(values) => {
-                let values = values.as_ref();
-                match $TIME_UNIT {
-                    TimeUnit::Second => build_values_list_tz!(
-                        TimestampSecondBuilder,
-                        TimestampSecond,
-                        values,
-                        $SIZE
-                    ),
-                    TimeUnit::Microsecond => build_values_list_tz!(
-                        TimestampMillisecondBuilder,
-                        TimestampMillisecond,
-                        values,
-                        $SIZE
-                    ),
-                    TimeUnit::Millisecond => build_values_list_tz!(
-                        TimestampMicrosecondBuilder,
-                        TimestampMicrosecond,
-                        values,
-                        $SIZE
-                    ),
-                    TimeUnit::Nanosecond => build_values_list_tz!(
-                        TimestampNanosecondBuilder,
-                        TimestampNanosecond,
-                        values,
-                        $SIZE
-                    ),
-                }
-            }
-        }
-    }};
-}
+      let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT));
 
-macro_rules! build_values_list {
-    ($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
-        let mut builder = ListBuilder::new($VALUE_BUILDER_TY::new($VALUES.len()));
-
-        for _ in 0..$SIZE {
-            for scalar_value in $VALUES {
-                match scalar_value {
-                    ScalarValue::$SCALAR_TY(Some(v)) => {
-                        builder.values().append_value(v.clone()).unwrap()
-                    }
-                    ScalarValue::$SCALAR_TY(None) => {
-                        builder.values().append_null().unwrap();
-                    }
-                    _ => panic!("Incompatible ScalarValue for list"),
-                };
-            }
-            builder.append(true).unwrap();
-        }
-
-        builder.finish()
+      assert_eq!(&array, &expected);
     }};
-}
+  }
 
-macro_rules! build_values_list_tz {
-    ($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
-        let mut builder = ListBuilder::new($VALUE_BUILDER_TY::new($VALUES.len()));
-
-        for _ in 0..$SIZE {
-            for scalar_value in $VALUES {
-                match scalar_value {
-                    ScalarValue::$SCALAR_TY(Some(v), _) => {
-                        builder.values().append_value(v.clone()).unwrap()
-                    }
-                    ScalarValue::$SCALAR_TY(None, _) => {
-                        builder.values().append_null().unwrap();
-                    }
-                    _ => panic!("Incompatible ScalarValue for list"),
-                };
-            }
-            builder.append(true).unwrap();
-        }
+  /// Creates array directly and via ScalarValue and ensures they
+  /// are the same, for string  arrays
+  macro_rules! check_scalar_iter_string {
+    ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{
+      let scalars: Vec<_> = $INPUT
+        .iter()
+        .map(|v| ScalarValue::$SCALAR_T(v.map(|v| v.to_string())))
+        .collect();
 
-        builder.finish()
-    }};
-}
+      let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();
 
-macro_rules! build_array_from_option {
-    ($DATA_TYPE:ident, $ARRAY_TYPE:ident, $EXPR:expr, $SIZE:expr) => {{
-        match $EXPR {
-            Some(value) => Arc::new($ARRAY_TYPE::from_value(*value, $SIZE)),
-            None => new_null_array(&DataType::$DATA_TYPE, $SIZE),
-        }
-    }};
-    ($DATA_TYPE:ident, $ENUM:expr, $ARRAY_TYPE:ident, $EXPR:expr, $SIZE:expr) => {{
-        match $EXPR {
-            Some(value) => Arc::new($ARRAY_TYPE::from_value(*value, $SIZE)),
-            None => new_null_array(&DataType::$DATA_TYPE($ENUM), $SIZE),
-        }
-    }};
-    ($DATA_TYPE:ident, $ENUM:expr, $ENUM2:expr, $ARRAY_TYPE:ident, $EXPR:expr, $SIZE:expr) => {{
-        match $EXPR {
-            Some(value) => {
-                let array: ArrayRef = Arc::new($ARRAY_TYPE::from_value(*value, $SIZE));
-                // Need to call cast to cast to final data type with timezone/extra param
-                cast(&array, &DataType::$DATA_TYPE($ENUM, $ENUM2))
-                    .expect("cannot do temporal cast")
-            }
-            None => new_null_array(&DataType::$DATA_TYPE($ENUM, $ENUM2), $SIZE),
-        }
-    }};
-}
+      let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT));
 
-macro_rules! eq_array_primitive {
-    ($array:expr, $index:expr, $ARRAYTYPE:ident, $VALUE:expr) => {{
-        let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
-        let is_valid = array.is_valid($index);
-        match $VALUE {
-            Some(val) => is_valid && &array.value($index) == val,
-            None => !is_valid,
-        }
+      assert_eq!(&array, &expected);
     }};
-}
+  }
 
-impl ScalarValue {
-    /// Create a decimal Scalar from value/precision and scale.
-    pub fn try_new_decimal128(
-        value: i128,
-        precision: usize,
-        scale: usize,
-    ) -> Result<Self> {
-        // make sure the precision and scale is valid
-        if precision <= MAX_PRECISION_FOR_DECIMAL128 && scale <= precision {
-            return Ok(ScalarValue::Decimal128(Some(value), precision, scale));
-        }
-        return Err(DataFusionError::Internal(format!(
-            "Can not new a decimal type ScalarValue for precision {} and scale {}",
-            precision, scale
-        )));
-    }
-    /// Getter for the `DataType` of the value
-    pub fn get_datatype(&self) -> DataType {
-        match self {
-            ScalarValue::Boolean(_) => DataType::Boolean,
-            ScalarValue::UInt8(_) => DataType::UInt8,
-            ScalarValue::UInt16(_) => DataType::UInt16,
-            ScalarValue::UInt32(_) => DataType::UInt32,
-            ScalarValue::UInt64(_) => DataType::UInt64,
-            ScalarValue::Int8(_) => DataType::Int8,
-            ScalarValue::Int16(_) => DataType::Int16,
-            ScalarValue::Int32(_) => DataType::Int32,
-            ScalarValue::Int64(_) => DataType::Int64,
-            ScalarValue::Decimal128(_, precision, scale) => {
-                DataType::Decimal(*precision, *scale)
-            }
-            ScalarValue::TimestampSecond(_, tz_opt) => {
-                DataType::Timestamp(TimeUnit::Second, tz_opt.clone())
-            }
-            ScalarValue::TimestampMillisecond(_, tz_opt) => {
-                DataType::Timestamp(TimeUnit::Millisecond, tz_opt.clone())
-            }
-            ScalarValue::TimestampMicrosecond(_, tz_opt) => {
-                DataType::Timestamp(TimeUnit::Microsecond, tz_opt.clone())
-            }
-            ScalarValue::TimestampNanosecond(_, tz_opt) => {
-                DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
-            }
-            ScalarValue::Float32(_) => DataType::Float32,
-            ScalarValue::Float64(_) => DataType::Float64,
-            ScalarValue::Utf8(_) => DataType::Utf8,
-            ScalarValue::LargeUtf8(_) => DataType::LargeUtf8,
-            ScalarValue::Binary(_) => DataType::Binary,
-            ScalarValue::LargeBinary(_) => DataType::LargeBinary,
-            ScalarValue::List(_, data_type) => DataType::List(Box::new(Field::new(
-                "item",
-                data_type.as_ref().clone(),
-                true,
-            ))),
-            ScalarValue::Date32(_) => DataType::Date32,
-            ScalarValue::Date64(_) => DataType::Date64,
-            ScalarValue::IntervalYearMonth(_) => {
-                DataType::Interval(IntervalUnit::YearMonth)
-            }
-            ScalarValue::IntervalDayTime(_) => DataType::Interval(IntervalUnit::DayTime),
-            ScalarValue::IntervalMonthDayNano(_) => {
-                DataType::Interval(IntervalUnit::MonthDayNano)
-            }
-            ScalarValue::Struct(_, fields) => DataType::Struct(fields.as_ref().clone()),
-        }
-    }
+  /// Creates array directly and via ScalarValue and ensures they
+  /// are the same, for binary arrays
+  macro_rules! check_scalar_iter_binary {
+    ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{
+      let scalars: Vec<_> = $INPUT
+        .iter()
+        .map(|v| ScalarValue::$SCALAR_T(v.map(|v| v.to_vec())))
+        .collect();
 
-    /// Calculate arithmetic negation for a scalar value
-    pub fn arithmetic_negate(&self) -> Self {
-        match self {
-            ScalarValue::Boolean(None)
-            | ScalarValue::Int8(None)
-            | ScalarValue::Int16(None)
-            | ScalarValue::Int32(None)
-            | ScalarValue::Int64(None)
-            | ScalarValue::Float32(None) => self.clone(),
-            ScalarValue::Float64(Some(v)) => ScalarValue::Float64(Some(-v)),
-            ScalarValue::Float32(Some(v)) => ScalarValue::Float32(Some(-v)),
-            ScalarValue::Int8(Some(v)) => ScalarValue::Int8(Some(-v)),
-            ScalarValue::Int16(Some(v)) => ScalarValue::Int16(Some(-v)),
-            ScalarValue::Int32(Some(v)) => ScalarValue::Int32(Some(-v)),
-            ScalarValue::Int64(Some(v)) => ScalarValue::Int64(Some(-v)),
-            ScalarValue::Decimal128(Some(v), precision, scale) => {
-                ScalarValue::Decimal128(Some(-v), *precision, *scale)
-            }
-            _ => panic!("Cannot run arithmetic negate on scalar value: {:?}", self),
-        }
-    }
+      let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();
 
-    /// whether this value is null or not.
-    pub fn is_null(&self) -> bool {
-        matches!(
-            *self,
-            ScalarValue::Boolean(None)
-                | ScalarValue::UInt8(None)
-                | ScalarValue::UInt16(None)
-                | ScalarValue::UInt32(None)
-                | ScalarValue::UInt64(None)
-                | ScalarValue::Int8(None)
-                | ScalarValue::Int16(None)
-                | ScalarValue::Int32(None)
-                | ScalarValue::Int64(None)
-                | ScalarValue::Float32(None)
-                | ScalarValue::Float64(None)
-                | ScalarValue::Date32(None)
-                | ScalarValue::Date64(None)
-                | ScalarValue::Utf8(None)
-                | ScalarValue::LargeUtf8(None)
-                | ScalarValue::List(None, _)
-                | ScalarValue::TimestampSecond(None, _)
-                | ScalarValue::TimestampMillisecond(None, _)
-                | ScalarValue::TimestampMicrosecond(None, _)
-                | ScalarValue::TimestampNanosecond(None, _)
-                | ScalarValue::Struct(None, _)
-                | ScalarValue::Decimal128(None, _, _) // For decimal type, the value is null means ScalarValue::Decimal128 is null.
-        )
-    }
+      let expected: $ARRAYTYPE = $INPUT.iter().map(|v| v.map(|v| v.to_vec())).collect();
 
-    /// Converts a scalar value into an 1-row array.
-    pub fn to_array(&self) -> ArrayRef {
-        self.to_array_of_size(1)
-    }
+      let expected: ArrayRef = Arc::new(expected);
 
-    /// Converts an iterator of references [`ScalarValue`] into an [`ArrayRef`]
-    /// corresponding to those values. For example,
-    ///
-    /// Returns an error if the iterator is empty or if the
-    /// [`ScalarValue`]s are not all the same type
-    ///
-    /// Example
-    /// ```
-    /// use datafusion::scalar::ScalarValue;
-    /// use arrow::array::{ArrayRef, BooleanArray};
-    ///
-    /// let scalars = vec![
-    ///   ScalarValue::Boolean(Some(true)),
-    ///   ScalarValue::Boolean(None),
-    ///   ScalarValue::Boolean(Some(false)),
-    /// ];
-    ///
-    /// // Build an Array from the list of ScalarValues
-    /// let array = ScalarValue::iter_to_array(scalars.into_iter())
-    ///   .unwrap();
-    ///
-    /// let expected: ArrayRef = std::sync::Arc::new(
-    ///   BooleanArray::from(vec![
-    ///     Some(true),
-    ///     None,
-    ///     Some(false)
-    ///   ]
-    /// ));
-    ///
-    /// assert_eq!(&array, &expected);
-    /// ```
-    pub fn iter_to_array(
-        scalars: impl IntoIterator<Item = ScalarValue>,
-    ) -> Result<ArrayRef> {
-        let mut scalars = scalars.into_iter().peekable();
-
-        // figure out the type based on the first element
-        let data_type = match scalars.peek() {
-            None => {
-                return Err(DataFusionError::Internal(
-                    "Empty iterator passed to ScalarValue::iter_to_array".to_string(),
-                ));
-            }
-            Some(sv) => sv.get_datatype(),
-        };
-
-        /// Creates an array of $ARRAY_TY by unpacking values of
-        /// SCALAR_TY for primitive types
-        macro_rules! build_array_primitive {
-            ($ARRAY_TY:ident, $SCALAR_TY:ident) => {{
-                {
-                    let array = scalars
-                        .map(|sv| {
-                            if let ScalarValue::$SCALAR_TY(v) = sv {
-                                Ok(v)
-                            } else {
-                                Err(DataFusionError::Internal(format!(
-                                    "Inconsistent types in ScalarValue::iter_to_array. \
-                                     Expected {:?}, got {:?}",
-                                    data_type, sv
-                                )))
-                            }
-                        })
-                        .collect::<Result<$ARRAY_TY>>()?;
-
-                    Arc::new(array)
-                }
-            }};
-        }
-
-        macro_rules! build_array_primitive_tz {
-            ($ARRAY_TY:ident, $SCALAR_TY:ident) => {{
-                {
-                    let array = scalars
-                        .map(|sv| {
-                            if let ScalarValue::$SCALAR_TY(v, _) = sv {
-                                Ok(v)
-                            } else {
-                                Err(DataFusionError::Internal(format!(
-                                    "Inconsistent types in ScalarValue::iter_to_array. \
-                                     Expected {:?}, got {:?}",
-                                    data_type, sv
-                                )))
-                            }
-                        })
-                        .collect::<Result<$ARRAY_TY>>()?;
-
-                    Arc::new(array)
-                }
-            }};
-        }
-
-        /// Creates an array of $ARRAY_TY by unpacking values of
-        /// SCALAR_TY for "string-like" types.
-        macro_rules! build_array_string {
-            ($ARRAY_TY:ident, $SCALAR_TY:ident) => {{
-                {
-                    let array = scalars
-                        .map(|sv| {
-                            if let ScalarValue::$SCALAR_TY(v) = sv {
-                                Ok(v)
-                            } else {
-                                Err(DataFusionError::Internal(format!(
-                                    "Inconsistent types in ScalarValue::iter_to_array. \
-                                     Expected {:?}, got {:?}",
-                                    data_type, sv
-                                )))
-                            }
-                        })
-                        .collect::<Result<$ARRAY_TY>>()?;
-                    Arc::new(array)
-                }
-            }};
-        }
-
-        macro_rules! build_array_list_primitive {
-            ($ARRAY_TY:ident, $SCALAR_TY:ident, $NATIVE_TYPE:ident) => {{
-                Arc::new(ListArray::from_iter_primitive::<$ARRAY_TY, _, _>(
-                    scalars.into_iter().map(|x| match x {
-                        ScalarValue::List(xs, _) => xs.map(|x| {
-                            x.iter()
-                                .map(|x| match x {
-                                    ScalarValue::$SCALAR_TY(i) => *i,
-                                    sv => panic!("Inconsistent types in ScalarValue::iter_to_array. \
-                                    Expected {:?}, got {:?}", data_type, sv),
-                                })
-                                .collect::<Vec<Option<$NATIVE_TYPE>>>()
-                        }),
-                        sv => panic!("Inconsistent types in ScalarValue::iter_to_array. \
-                        Expected {:?}, got {:?}", data_type, sv),
-        }),
-                ))
-            }};
-        }
-
-        macro_rules! build_array_list_string {
-            ($BUILDER:ident, $SCALAR_TY:ident) => {{
-                let mut builder = ListBuilder::new($BUILDER::new(0));
-
-                for scalar in scalars.into_iter() {
-                    match scalar {
-                        ScalarValue::List(Some(xs), _) => {
-                            let xs = *xs;
-                            for s in xs {
-                                match s {
-                                    ScalarValue::$SCALAR_TY(Some(val)) => {
-                                        builder.values().append_value(val)?;
-                                    }
-                                    ScalarValue::$SCALAR_TY(None) => {
-                                        builder.values().append_null()?;
-                                    }
-                                    sv => return Err(DataFusionError::Internal(format!(
-                                        "Inconsistent types in ScalarValue::iter_to_array. \
-                                         Expected Utf8, got {:?}",
-                                        sv
-                                    ))),
-                                }
-                            }
-                            builder.append(true)?;
-                        }
-                        ScalarValue::List(None, _) => {
-                            builder.append(false)?;
-                        }
-                        sv => {
-                            return Err(DataFusionError::Internal(format!(
-                                "Inconsistent types in ScalarValue::iter_to_array. \
-                             Expected List, got {:?}",
-                                sv
-                            )))
-                        }
-                    }
-                }
-
-                Arc::new(builder.finish())
-
-            }}
-        }
-
-        let array: ArrayRef = match &data_type {
-            DataType::Decimal(precision, scale) => {
-                let decimal_array =
-                    ScalarValue::iter_to_decimal_array(scalars, precision, scale)?;
-                Arc::new(decimal_array)
-            }
-            DataType::Boolean => build_array_primitive!(BooleanArray, Boolean),
-            DataType::Float32 => build_array_primitive!(Float32Array, Float32),
-            DataType::Float64 => build_array_primitive!(Float64Array, Float64),
-            DataType::Int8 => build_array_primitive!(Int8Array, Int8),
-            DataType::Int16 => build_array_primitive!(Int16Array, Int16),
-            DataType::Int32 => build_array_primitive!(Int32Array, Int32),
-            DataType::Int64 => build_array_primitive!(Int64Array, Int64),
-            DataType::UInt8 => build_array_primitive!(UInt8Array, UInt8),
-            DataType::UInt16 => build_array_primitive!(UInt16Array, UInt16),
-            DataType::UInt32 => build_array_primitive!(UInt32Array, UInt32),
-            DataType::UInt64 => build_array_primitive!(UInt64Array, UInt64),
-            DataType::Utf8 => build_array_string!(StringArray, Utf8),
-            DataType::LargeUtf8 => build_array_string!(LargeStringArray, LargeUtf8),
-            DataType::Binary => build_array_string!(BinaryArray, Binary),
-            DataType::LargeBinary => build_array_string!(LargeBinaryArray, LargeBinary),
-            DataType::Date32 => build_array_primitive!(Date32Array, Date32),
-            DataType::Date64 => build_array_primitive!(Date64Array, Date64),
-            DataType::Timestamp(TimeUnit::Second, _) => {
-                build_array_primitive_tz!(TimestampSecondArray, TimestampSecond)
-            }
-            DataType::Timestamp(TimeUnit::Millisecond, _) => {
-                build_array_primitive_tz!(TimestampMillisecondArray, TimestampMillisecond)
-            }
-            DataType::Timestamp(TimeUnit::Microsecond, _) => {
-                build_array_primitive_tz!(TimestampMicrosecondArray, TimestampMicrosecond)
-            }
-            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
-                build_array_primitive_tz!(TimestampNanosecondArray, TimestampNanosecond)
-            }
-            DataType::Interval(IntervalUnit::DayTime) => {
-                build_array_primitive!(IntervalDayTimeArray, IntervalDayTime)
-            }
-            DataType::Interval(IntervalUnit::YearMonth) => {
-                build_array_primitive!(IntervalYearMonthArray, IntervalYearMonth)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::Int8 => {
-                build_array_list_primitive!(Int8Type, Int8, i8)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::Int16 => {
-                build_array_list_primitive!(Int16Type, Int16, i16)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::Int32 => {
-                build_array_list_primitive!(Int32Type, Int32, i32)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::Int64 => {
-                build_array_list_primitive!(Int64Type, Int64, i64)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::UInt8 => {
-                build_array_list_primitive!(UInt8Type, UInt8, u8)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::UInt16 => {
-                build_array_list_primitive!(UInt16Type, UInt16, u16)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::UInt32 => {
-                build_array_list_primitive!(UInt32Type, UInt32, u32)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::UInt64 => {
-                build_array_list_primitive!(UInt64Type, UInt64, u64)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::Float32 => {
-                build_array_list_primitive!(Float32Type, Float32, f32)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::Float64 => {
-                build_array_list_primitive!(Float64Type, Float64, f64)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::Utf8 => {
-                build_array_list_string!(StringBuilder, Utf8)
-            }
-            DataType::List(fields) if fields.data_type() == &DataType::LargeUtf8 => {
-                build_array_list_string!(LargeStringBuilder, LargeUtf8)
-            }
-            DataType::List(_) => {
-                // Fallback case handling homogeneous lists with any ScalarValue element type
-                let list_array = ScalarValue::iter_to_array_list(scalars, &data_type)?;
-                Arc::new(list_array)
-            }
-            DataType::Struct(fields) => {
-                // Initialize a Vector to store the ScalarValues for each column
-                let mut columns: Vec<Vec<ScalarValue>> =
-                    (0..fields.len()).map(|_| Vec::new()).collect();
-
-                // Iterate over scalars to populate the column scalars for each row
-                for scalar in scalars {
-                    if let ScalarValue::Struct(values, fields) = scalar {
-                        match values {
-                            Some(values) => {
-                                // Push value for each field
-                                for c in 0..columns.len() {
-                                    let column = columns.get_mut(c).unwrap();
-                                    column.push(values[c].clone());
-                                }
-                            }
-                            None => {
-                                // Push NULL of the appropriate type for each field
-                                for c in 0..columns.len() {
-                                    let dtype = fields[c].data_type();
-                                    let column = columns.get_mut(c).unwrap();
-                                    column.push(ScalarValue::try_from(dtype)?);
-                                }
-                            }
-                        };
-                    } else {
-                        return Err(DataFusionError::Internal(format!(
-                            "Expected Struct but found: {}",
-                            scalar
-                        )));
-                    };
-                }
-
-                // Call iter_to_array recursively to convert the scalars for each column into Arrow arrays
-                let field_values = fields
-                    .iter()
-                    .zip(columns)
-                    .map(|(field, column)| -> Result<(Field, ArrayRef)> {
-                        Ok((field.clone(), Self::iter_to_array(column)?))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                Arc::new(StructArray::from(field_values))
-            }
-            _ => {
-                return Err(DataFusionError::Internal(format!(
-                    "Unsupported creation of {:?} array from ScalarValue {:?}",
-                    data_type,
-                    scalars.peek()
-                )));
-            }
-        };
-
-        Ok(array)
-    }
-
-    fn iter_to_decimal_array(
-        scalars: impl IntoIterator<Item = ScalarValue>,
-        precision: &usize,
-        scale: &usize,
-    ) -> Result<DecimalArray> {
-        // collect the value as Option<i128>
-        let array = scalars
-            .into_iter()
-            .map(|element: ScalarValue| match element {
-                ScalarValue::Decimal128(v1, _, _) => v1,
-                _ => unreachable!(),
-            })
-            .collect::<Vec<Option<i128>>>();
-
-        // build the decimal array using the Decimal Builder
-        let mut builder = DecimalBuilder::new(array.len(), *precision, *scale);
-        array.iter().for_each(|element| match element {
-            None => {
-                builder.append_null().unwrap();
-            }
-            Some(v) => {
-                builder.append_value(*v).unwrap();
-            }
-        });
-        Ok(builder.finish())
-    }
-
-    fn iter_to_array_list(
-        scalars: impl IntoIterator<Item = ScalarValue>,
-        data_type: &DataType,
-    ) -> Result<GenericListArray<i32>> {
-        let mut offsets = Int32Array::builder(0);
-        if let Err(err) = offsets.append_value(0) {
-            return Err(DataFusionError::ArrowError(err));
-        }
-
-        let mut elements: Vec<ArrayRef> = Vec::new();
-        let mut valid = BooleanBufferBuilder::new(0);
-        let mut flat_len = 0i32;
-        for scalar in scalars {
-            if let ScalarValue::List(values, _) = scalar {
-                match values {
-                    Some(values) => {
-                        let element_array = ScalarValue::iter_to_array(*values)?;
-
-                        // Add new offset index
-                        flat_len += element_array.len() as i32;
-                        if let Err(err) = offsets.append_value(flat_len) {
-                            return Err(DataFusionError::ArrowError(err));
-                        }
-
-                        elements.push(element_array);
-
-                        // Element is valid
-                        valid.append(true);
-                    }
-                    None => {
-                        // Repeat previous offset index
-                        if let Err(err) = offsets.append_value(flat_len) {
-                            return Err(DataFusionError::ArrowError(err));
-                        }
-
-                        // Element is null
-                        valid.append(false);
-                    }
-                }
-            } else {
-                return Err(DataFusionError::Internal(format!(
-                    "Expected ScalarValue::List element. Received {:?}",
-                    scalar
-                )));
-            }
-        }
-
-        // Concatenate element arrays to create single flat array
-        let element_arrays: Vec<&dyn Array> =
-            elements.iter().map(|a| a.as_ref()).collect();
-        let flat_array = match arrow::compute::concat(&element_arrays) {
-            Ok(flat_array) => flat_array,
-            Err(err) => return Err(DataFusionError::ArrowError(err)),
-        };
-
-        // Build ListArray using ArrayData so we can specify a flat inner array, and offset indices
-        let offsets_array = offsets.finish();
-        let array_data = ArrayDataBuilder::new(data_type.clone())
-            .len(offsets_array.len() - 1)
-            .null_bit_buffer(valid.finish())
-            .add_buffer(offsets_array.data().buffers()[0].clone())
-            .add_child_data(flat_array.data().clone());
-
-        let list_array = ListArray::from(array_data.build()?);
-        Ok(list_array)
-    }
-
-    fn build_decimal_array(
-        value: &Option<i128>,
-        precision: &usize,
-        scale: &usize,
-        size: usize,
-    ) -> DecimalArray {
-        let mut builder = DecimalBuilder::new(size, *precision, *scale);
-        match value {
-            None => {
-                for _i in 0..size {
-                    builder.append_null().unwrap();
-                }
-            }
-            Some(v) => {
-                let v = *v;
-                for _i in 0..size {
-                    builder.append_value(v).unwrap();
-                }
-            }
-        };
-        builder.finish()
-    }
-
-    /// Converts a scalar value into an array of `size` rows.
-    pub fn to_array_of_size(&self, size: usize) -> ArrayRef {
-        match self {
-            ScalarValue::Decimal128(e, precision, scale) => {
-                Arc::new(ScalarValue::build_decimal_array(e, precision, scale, size))
-            }
-            ScalarValue::Boolean(e) => {
-                Arc::new(BooleanArray::from(vec![*e; size])) as ArrayRef
-            }
-            ScalarValue::Float64(e) => {
-                build_array_from_option!(Float64, Float64Array, e, size)
-            }
-            ScalarValue::Float32(e) => {
-                build_array_from_option!(Float32, Float32Array, e, size)
-            }
-            ScalarValue::Int8(e) => build_array_from_option!(Int8, Int8Array, e, size),
-            ScalarValue::Int16(e) => build_array_from_option!(Int16, Int16Array, e, size),
-            ScalarValue::Int32(e) => build_array_from_option!(Int32, Int32Array, e, size),
-            ScalarValue::Int64(e) => build_array_from_option!(Int64, Int64Array, e, size),
-            ScalarValue::UInt8(e) => build_array_from_option!(UInt8, UInt8Array, e, size),
-            ScalarValue::UInt16(e) => {
-                build_array_from_option!(UInt16, UInt16Array, e, size)
-            }
-            ScalarValue::UInt32(e) => {
-                build_array_from_option!(UInt32, UInt32Array, e, size)
-            }
-            ScalarValue::UInt64(e) => {
-                build_array_from_option!(UInt64, UInt64Array, e, size)
-            }
-            ScalarValue::TimestampSecond(e, tz_opt) => build_array_from_option!(
-                Timestamp,
-                TimeUnit::Second,
-                tz_opt.clone(),
-                TimestampSecondArray,
-                e,
-                size
-            ),
-            ScalarValue::TimestampMillisecond(e, tz_opt) => build_array_from_option!(
-                Timestamp,
-                TimeUnit::Millisecond,
-                tz_opt.clone(),
-                TimestampMillisecondArray,
-                e,
-                size
-            ),
-
-            ScalarValue::TimestampMicrosecond(e, tz_opt) => build_array_from_option!(
-                Timestamp,
-                TimeUnit::Microsecond,
-                tz_opt.clone(),
-                TimestampMicrosecondArray,
-                e,
-                size
-            ),
-            ScalarValue::TimestampNanosecond(e, tz_opt) => build_array_from_option!(
-                Timestamp,
-                TimeUnit::Nanosecond,
-                tz_opt.clone(),
-                TimestampNanosecondArray,
-                e,
-                size
-            ),
-            ScalarValue::Utf8(e) => match e {
-                Some(value) => {
-                    Arc::new(StringArray::from_iter_values(repeat(value).take(size)))
-                }
-                None => new_null_array(&DataType::Utf8, size),
-            },
-            ScalarValue::LargeUtf8(e) => match e {
-                Some(value) => {
-                    Arc::new(LargeStringArray::from_iter_values(repeat(value).take(size)))
-                }
-                None => new_null_array(&DataType::LargeUtf8, size),
-            },
-            ScalarValue::Binary(e) => match e {
-                Some(value) => Arc::new(
-                    repeat(Some(value.as_slice()))
-                        .take(size)
-                        .collect::<BinaryArray>(),
-                ),
-                None => {
-                    Arc::new(repeat(None::<&str>).take(size).collect::<BinaryArray>())
-                }
-            },
-            ScalarValue::LargeBinary(e) => match e {
-                Some(value) => Arc::new(
-                    repeat(Some(value.as_slice()))
-                        .take(size)
-                        .collect::<LargeBinaryArray>(),
-                ),
-                None => Arc::new(
-                    repeat(None::<&str>)
-                        .take(size)
-                        .collect::<LargeBinaryArray>(),
-                ),
-            },
-            ScalarValue::List(values, data_type) => Arc::new(match data_type.as_ref() {
-                DataType::Boolean => build_list!(BooleanBuilder, Boolean, values, size),
-                DataType::Int8 => build_list!(Int8Builder, Int8, values, size),
-                DataType::Int16 => build_list!(Int16Builder, Int16, values, size),
-                DataType::Int32 => build_list!(Int32Builder, Int32, values, size),
-                DataType::Int64 => build_list!(Int64Builder, Int64, values, size),
-                DataType::UInt8 => build_list!(UInt8Builder, UInt8, values, size),
-                DataType::UInt16 => build_list!(UInt16Builder, UInt16, values, size),
-                DataType::UInt32 => build_list!(UInt32Builder, UInt32, values, size),
-                DataType::UInt64 => build_list!(UInt64Builder, UInt64, values, size),
-                DataType::Utf8 => build_list!(StringBuilder, Utf8, values, size),
-                DataType::Float32 => build_list!(Float32Builder, Float32, values, size),
-                DataType::Float64 => build_list!(Float64Builder, Float64, values, size),
-                DataType::Timestamp(unit, tz) => {
-                    build_timestamp_list!(unit.clone(), tz.clone(), values, size)
-                }
-                &DataType::LargeUtf8 => {
-                    build_list!(LargeStringBuilder, LargeUtf8, values, size)
-                }
-                _ => ScalarValue::iter_to_array_list(
-                    repeat(self.clone()).take(size),
-                    &DataType::List(Box::new(Field::new(
-                        "item",
-                        data_type.as_ref().clone(),
-                        true,
-                    ))),
-                )
-                .unwrap(),
-            }),
-            ScalarValue::Date32(e) => {
-                build_array_from_option!(Date32, Date32Array, e, size)
-            }
-            ScalarValue::Date64(e) => {
-                build_array_from_option!(Date64, Date64Array, e, size)
-            }
-            ScalarValue::IntervalDayTime(e) => build_array_from_option!(
-                Interval,
-                IntervalUnit::DayTime,
-                IntervalDayTimeArray,
-                e,
-                size
-            ),
-            ScalarValue::IntervalYearMonth(e) => build_array_from_option!(
-                Interval,
-                IntervalUnit::YearMonth,
-                IntervalYearMonthArray,
-                e,
-                size
-            ),
-            ScalarValue::IntervalMonthDayNano(e) => build_array_from_option!(
-                Interval,
-                IntervalUnit::MonthDayNano,
-                IntervalMonthDayNanoArray,
-                e,
-                size
-            ),
-            ScalarValue::Struct(values, fields) => match values {
-                Some(values) => {
-                    let field_values: Vec<_> = fields
-                        .iter()
-                        .zip(values.iter())
-                        .map(|(field, value)| {
-                            (field.clone(), value.to_array_of_size(size))
-                        })
-                        .collect();
-
-                    Arc::new(StructArray::from(field_values))
-                }
-                None => {
-                    let field_values: Vec<_> = fields
-                        .iter()
-                        .map(|field| {
-                            let none_field = Self::try_from(field.data_type()).expect(
-                                "Failed to construct null ScalarValue from Struct field type"
-                            );
-                            (field.clone(), none_field.to_array_of_size(size))
-                        })
-                        .collect();
-
-                    Arc::new(StructArray::from(field_values))
-                }
-            },
-        }
-    }
-
-    fn get_decimal_value_from_array(
-        array: &ArrayRef,
-        index: usize,
-        precision: &usize,
-        scale: &usize,
-    ) -> ScalarValue {
-        let array = array.as_any().downcast_ref::<DecimalArray>().unwrap();
-        if array.is_null(index) {
-            ScalarValue::Decimal128(None, *precision, *scale)
-        } else {
-            ScalarValue::Decimal128(Some(array.value(index)), *precision, *scale)
-        }
-    }
-
-    /// Converts a value in `array` at `index` into a ScalarValue
-    pub fn try_from_array(array: &ArrayRef, index: usize) -> Result<Self> {
-        // handle NULL value
-        if !array.is_valid(index) {
-            return array.data_type().try_into();
-        }
-
-        Ok(match array.data_type() {
-            DataType::Decimal(precision, scale) => {
-                ScalarValue::get_decimal_value_from_array(array, index, precision, scale)
-            }
-            DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean),
-            DataType::Float64 => typed_cast!(array, index, Float64Array, Float64),
-            DataType::Float32 => typed_cast!(array, index, Float32Array, Float32),
-            DataType::UInt64 => typed_cast!(array, index, UInt64Array, UInt64),
-            DataType::UInt32 => typed_cast!(array, index, UInt32Array, UInt32),
-            DataType::UInt16 => typed_cast!(array, index, UInt16Array, UInt16),
-            DataType::UInt8 => typed_cast!(array, index, UInt8Array, UInt8),
-            DataType::Int64 => typed_cast!(array, index, Int64Array, Int64),
-            DataType::Int32 => typed_cast!(array, index, Int32Array, Int32),
-            DataType::Int16 => typed_cast!(array, index, Int16Array, Int16),
-            DataType::Int8 => typed_cast!(array, index, Int8Array, Int8),
-            DataType::Binary => typed_cast!(array, index, BinaryArray, Binary),
-            DataType::LargeBinary => {
-                typed_cast!(array, index, LargeBinaryArray, LargeBinary)
-            }
-            DataType::Utf8 => typed_cast!(array, index, StringArray, Utf8),
-            DataType::LargeUtf8 => typed_cast!(array, index, LargeStringArray, LargeUtf8),
-            DataType::List(nested_type) => {
-                let list_array =
-                    array.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
-                        DataFusionError::Internal(
-                            "Failed to downcast ListArray".to_string(),
-                        )
-                    })?;
-                let value = match list_array.is_null(index) {
-                    true => None,
-                    false => {
-                        let nested_array = list_array.value(index);
-                        let scalar_vec = (0..nested_array.len())
-                            .map(|i| ScalarValue::try_from_array(&nested_array, i))
-                            .collect::<Result<Vec<_>>>()?;
-                        Some(scalar_vec)
-                    }
-                };
-                let value = value.map(Box::new);
-                let data_type = Box::new(nested_type.data_type().clone());
-                ScalarValue::List(value, data_type)
-            }
-            DataType::Date32 => {
-                typed_cast!(array, index, Date32Array, Date32)
-            }
-            DataType::Date64 => {
-                typed_cast!(array, index, Date64Array, Date64)
-            }
-            DataType::Timestamp(TimeUnit::Second, tz_opt) => {
-                typed_cast_tz!(
-                    array,
-                    index,
-                    TimestampSecondArray,
-                    TimestampSecond,
-                    tz_opt
-                )
-            }
-            DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
-                typed_cast_tz!(
-                    array,
-                    index,
-                    TimestampMillisecondArray,
-                    TimestampMillisecond,
-                    tz_opt
-                )
-            }
-            DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
-                typed_cast_tz!(
-                    array,
-                    index,
-                    TimestampMicrosecondArray,
-                    TimestampMicrosecond,
-                    tz_opt
-                )
-            }
-            DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
-                typed_cast_tz!(
-                    array,
-                    index,
-                    TimestampNanosecondArray,
-                    TimestampNanosecond,
-                    tz_opt
-                )
-            }
-            DataType::Dictionary(index_type, _) => {
-                let (values, values_index) = match **index_type {
-                    DataType::Int8 => get_dict_value::<Int8Type>(array, index)?,
-                    DataType::Int16 => get_dict_value::<Int16Type>(array, index)?,
-                    DataType::Int32 => get_dict_value::<Int32Type>(array, index)?,
-                    DataType::Int64 => get_dict_value::<Int64Type>(array, index)?,
-                    DataType::UInt8 => get_dict_value::<UInt8Type>(array, index)?,
-                    DataType::UInt16 => get_dict_value::<UInt16Type>(array, index)?,
-                    DataType::UInt32 => get_dict_value::<UInt32Type>(array, index)?,
-                    DataType::UInt64 => get_dict_value::<UInt64Type>(array, index)?,
-                    _ => {
-                        return Err(DataFusionError::Internal(format!(
-                            "Index type not supported while creating scalar from dictionary: {}",
-                            array.data_type(),
-                        )));
-                    }
-                };
-
-                match values_index {
-                    Some(values_index) => Self::try_from_array(values, values_index)?,
-                    // was null
-                    None => values.data_type().try_into()?,
-                }
-            }
-            DataType::Struct(fields) => {
-                let array =
-                    array
-                        .as_any()
-                        .downcast_ref::<StructArray>()
-                        .ok_or_else(|| {
-                            DataFusionError::Internal(
-                                "Failed to downcast ArrayRef to StructArray".to_string(),
-                            )
-                        })?;
-                let mut field_values: Vec<ScalarValue> = Vec::new();
-                for col_index in 0..array.num_columns() {
-                    let col_array = array.column(col_index);
-                    let col_scalar = ScalarValue::try_from_array(col_array, index)?;
-                    field_values.push(col_scalar);
-                }
-                Self::Struct(Some(Box::new(field_values)), Box::new(fields.clone()))
-            }
-            other => {
-                return Err(DataFusionError::NotImplemented(format!(
-                    "Can't create a scalar from array of type \"{:?}\"",
-                    other
-                )));
-            }
-        })
-    }
-
-    fn eq_array_decimal(
-        array: &ArrayRef,
-        index: usize,
-        value: &Option<i128>,
-        precision: usize,
-        scale: usize,
-    ) -> bool {
-        let array = array.as_any().downcast_ref::<DecimalArray>().unwrap();
-        if array.precision() != precision || array.scale() != scale {
-            return false;
-        }
-        match value {
-            None => array.is_null(index),
-            Some(v) => !array.is_null(index) && array.value(index) == *v,
-        }
-    }
-
-    /// Compares a single row of array @ index for equality with self,
-    /// in an optimized fashion.
-    ///
-    /// This method implements an optimized version of:
-    ///
-    /// ```text
-    ///     let arr_scalar = Self::try_from_array(array, index).unwrap();
-    ///     arr_scalar.eq(self)
-    /// ```
-    ///
-    /// *Performance note*: the arrow compute kernels should be
-    /// preferred over this function if at all possible as they can be
-    /// vectorized and are generally much faster.
-    ///
-    /// This function has a few narrow usescases such as hash table key
-    /// comparisons where comparing a single row at a time is necessary.
-    #[inline]
-    pub fn eq_array(&self, array: &ArrayRef, index: usize) -> bool {
-        if let DataType::Dictionary(key_type, _) = array.data_type() {
-            return self.eq_array_dictionary(array, index, key_type);
-        }
-
-        match self {
-            ScalarValue::Decimal128(v, precision, scale) => {
-                ScalarValue::eq_array_decimal(array, index, v, *precision, *scale)
-            }
-            ScalarValue::Boolean(val) => {
-                eq_array_primitive!(array, index, BooleanArray, val)
-            }
-            ScalarValue::Float32(val) => {
-                eq_array_primitive!(array, index, Float32Array, val)
-            }
-            ScalarValue::Float64(val) => {
-                eq_array_primitive!(array, index, Float64Array, val)
-            }
-            ScalarValue::Int8(val) => eq_array_primitive!(array, index, Int8Array, val),
-            ScalarValue::Int16(val) => eq_array_primitive!(array, index, Int16Array, val),
-            ScalarValue::Int32(val) => eq_array_primitive!(array, index, Int32Array, val),
-            ScalarValue::Int64(val) => eq_array_primitive!(array, index, Int64Array, val),
-            ScalarValue::UInt8(val) => eq_array_primitive!(array, index, UInt8Array, val),
-            ScalarValue::UInt16(val) => {
-                eq_array_primitive!(array, index, UInt16Array, val)
-            }
-            ScalarValue::UInt32(val) => {
-                eq_array_primitive!(array, index, UInt32Array, val)
-            }
-            ScalarValue::UInt64(val) => {
-                eq_array_primitive!(array, index, UInt64Array, val)
-            }
-            ScalarValue::Utf8(val) => eq_array_primitive!(array, index, StringArray, val),
-            ScalarValue::LargeUtf8(val) => {
-                eq_array_primitive!(array, index, LargeStringArray, val)
-            }
-            ScalarValue::Binary(val) => {
-                eq_array_primitive!(array, index, BinaryArray, val)
-            }
-            ScalarValue::LargeBinary(val) => {
-                eq_array_primitive!(array, index, LargeBinaryArray, val)
-            }
-            ScalarValue::List(_, _) => unimplemented!(),
-            ScalarValue::Date32(val) => {
-                eq_array_primitive!(array, index, Date32Array, val)
-            }
-            ScalarValue::Date64(val) => {
-                eq_array_primitive!(array, index, Date64Array, val)
-            }
-            ScalarValue::TimestampSecond(val, _) => {
-                eq_array_primitive!(array, index, TimestampSecondArray, val)
-            }
-            ScalarValue::TimestampMillisecond(val, _) => {
-                eq_array_primitive!(array, index, TimestampMillisecondArray, val)
-            }
-            ScalarValue::TimestampMicrosecond(val, _) => {
-                eq_array_primitive!(array, index, TimestampMicrosecondArray, val)
-            }
-            ScalarValue::TimestampNanosecond(val, _) => {
-                eq_array_primitive!(array, index, TimestampNanosecondArray, val)
-            }
-            ScalarValue::IntervalYearMonth(val) => {
-                eq_array_primitive!(array, index, IntervalYearMonthArray, val)
-            }
-            ScalarValue::IntervalDayTime(val) => {
-                eq_array_primitive!(array, index, IntervalDayTimeArray, val)
-            }
-            ScalarValue::IntervalMonthDayNano(val) => {
-                eq_array_primitive!(array, index, IntervalMonthDayNanoArray, val)
-            }
-            ScalarValue::Struct(_, _) => unimplemented!(),
-        }
-    }
-
-    /// Compares a dictionary array with indexes of type `key_type`
-    /// with the array @ index for equality with self
-    fn eq_array_dictionary(
-        &self,
-        array: &ArrayRef,
-        index: usize,
-        key_type: &DataType,
-    ) -> bool {
-        let (values, values_index) = match key_type {
-            DataType::Int8 => get_dict_value::<Int8Type>(array, index).unwrap(),
-            DataType::Int16 => get_dict_value::<Int16Type>(array, index).unwrap(),
-            DataType::Int32 => get_dict_value::<Int32Type>(array, index).unwrap(),
-            DataType::Int64 => get_dict_value::<Int64Type>(array, index).unwrap(),
-            DataType::UInt8 => get_dict_value::<UInt8Type>(array, index).unwrap(),
-            DataType::UInt16 => get_dict_value::<UInt16Type>(array, index).unwrap(),
-            DataType::UInt32 => get_dict_value::<UInt32Type>(array, index).unwrap(),
-            DataType::UInt64 => get_dict_value::<UInt64Type>(array, index).unwrap(),
-            _ => unreachable!("Invalid dictionary keys type: {:?}", key_type),
-        };
-
-        match values_index {
-            Some(values_index) => self.eq_array(values, values_index),
-            None => self.is_null(),
-        }
-    }
-}
-
-macro_rules! impl_scalar {
-    ($ty:ty, $scalar:tt) => {
-        impl From<$ty> for ScalarValue {
-            fn from(value: $ty) -> Self {
-                ScalarValue::$scalar(Some(value))
-            }
-        }
-
-        impl From<Option<$ty>> for ScalarValue {
-            fn from(value: Option<$ty>) -> Self {
-                ScalarValue::$scalar(value)
-            }
-        }
-    };
-}
-
-impl_scalar!(f64, Float64);
-impl_scalar!(f32, Float32);
-impl_scalar!(i8, Int8);
-impl_scalar!(i16, Int16);
-impl_scalar!(i32, Int32);
-impl_scalar!(i64, Int64);
-impl_scalar!(bool, Boolean);
-impl_scalar!(u8, UInt8);
-impl_scalar!(u16, UInt16);
-impl_scalar!(u32, UInt32);
-impl_scalar!(u64, UInt64);
-
-impl From<&str> for ScalarValue {
-    fn from(value: &str) -> Self {
-        Some(value).into()
+      assert_eq!(&array, &expected);
+    }};
+  }
+
+  #[test]
+  fn scalar_iter_to_array_boolean() {
+    check_scalar_iter!(Boolean, BooleanArray, vec![Some(true), None, Some(false)]);
+    check_scalar_iter!(Float32, Float32Array, vec![Some(1.9), None, Some(-2.1)]);
+    check_scalar_iter!(Float64, Float64Array, vec![Some(1.9), None, Some(-2.1)]);
+
+    check_scalar_iter!(Int8, Int8Array, vec![Some(1), None, Some(3)]);
+    check_scalar_iter!(Int16, Int16Array, vec![Some(1), None, Some(3)]);
+    check_scalar_iter!(Int32, Int32Array, vec![Some(1), None, Some(3)]);
+    check_scalar_iter!(Int64, Int64Array, vec![Some(1), None, Some(3)]);
+
+    check_scalar_iter!(UInt8, UInt8Array, vec![Some(1), None, Some(3)]);
+    check_scalar_iter!(UInt16, UInt16Array, vec![Some(1), None, Some(3)]);
+    check_scalar_iter!(UInt32, UInt32Array, vec![Some(1), None, Some(3)]);
+    check_scalar_iter!(UInt64, UInt64Array, vec![Some(1), None, Some(3)]);
+
+    check_scalar_iter_tz!(
+      TimestampSecond,
+      TimestampSecondArray,
+      vec![Some(1), None, Some(3)]
+    );
+    check_scalar_iter_tz!(
+      TimestampMillisecond,
+      TimestampMillisecondArray,
+      vec![Some(1), None, Some(3)]
+    );
+    check_scalar_iter_tz!(
+      TimestampMicrosecond,
+      TimestampMicrosecondArray,
+      vec![Some(1), None, Some(3)]
+    );
+    check_scalar_iter_tz!(
+      TimestampNanosecond,
+      TimestampNanosecondArray,
+      vec![Some(1), None, Some(3)]
+    );
+
+    check_scalar_iter_string!(Utf8, StringArray, vec![Some("foo"), None, Some("bar")]);
+    check_scalar_iter_string!(
+      LargeUtf8,
+      LargeStringArray,
+      vec![Some("foo"), None, Some("bar")]
+    );
+    check_scalar_iter_binary!(
+      Binary,
+      BinaryArray,
+      vec![Some(b"foo"), None, Some(b"bar")]
+    );
+    check_scalar_iter_binary!(
+      LargeBinary,
+      LargeBinaryArray,
+      vec![Some(b"foo"), None, Some(b"bar")]
+    );
+  }
+
+  #[test]
+  fn scalar_iter_to_array_empty() {
+    let scalars = vec![] as Vec<ScalarValue>;
+
+    let result = ScalarValue::iter_to_array(scalars.into_iter()).unwrap_err();
+    assert!(
+      result
+        .to_string()
+        .contains("Empty iterator passed to ScalarValue::iter_to_array"),
+      "{}",
+      result
+    );
+  }
+
+  #[test]
+  fn scalar_iter_to_array_mismatched_types() {
+    use ScalarValue::*;
+    // If the scalar values are not all the correct type, error here
+    let scalars: Vec<ScalarValue> = vec![Boolean(Some(true)), Int32(Some(5))];
+
+    let result = ScalarValue::iter_to_array(scalars.into_iter()).unwrap_err();
+    assert!(result.to_string().contains("Inconsistent types in ScalarValue::iter_to_array. Expected Boolean, got Int32(5)"),
+                "{}", result);
+  }
+
+  #[test]
+  fn scalar_try_from_array_null() {
+    let array = vec![Some(33), None].into_iter().collect::<Int64Array>();
+    let array: ArrayRef = Arc::new(array);
+
+    assert_eq!(
+      ScalarValue::Int64(Some(33)),
+      ScalarValue::try_from_array(&array, 0).unwrap()
+    );
+    assert_eq!(
+      ScalarValue::Int64(None),
+      ScalarValue::try_from_array(&array, 1).unwrap()
+    );
+  }
+
+  #[test]
+  fn scalar_try_from_dict_datatype() {
+    let data_type =
+      DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8));
+    let data_type = &data_type;
+    assert_eq!(ScalarValue::Utf8(None), data_type.try_into().unwrap())
+  }
+
+  #[test]
+  fn size_of_scalar() {
+    // Since ScalarValues are used in a non trivial number of places,
+    // making it larger means significant more memory consumption
+    // per distinct value.
+    #[cfg(target_arch = "aarch64")]
+    assert_eq!(std::mem::size_of::<ScalarValue>(), 64);
+
+    #[cfg(target_arch = "amd64")]
+    assert_eq!(std::mem::size_of::<ScalarValue>(), 48);
+  }
+
+  #[test]
+  fn scalar_eq_array() {
+    // Validate that eq_array has the same semantics as ScalarValue::eq
+    macro_rules! make_typed_vec {
+      ($INPUT:expr, $TYPE:ident) => {{
+        $INPUT
+          .iter()
+          .map(|v| v.map(|v| v as $TYPE))
+          .collect::<Vec<_>>()
+      }};
     }
-}
 
-impl From<Option<&str>> for ScalarValue {
-    fn from(value: Option<&str>) -> Self {
-        let value = value.map(|s| s.to_string());
-        ScalarValue::Utf8(value)
-    }
-}
+    let bool_vals = vec![Some(true), None, Some(false)];
+    let f32_vals = vec![Some(-1.0), None, Some(1.0)];
+    let f64_vals = make_typed_vec!(f32_vals, f64);
 
-impl FromStr for ScalarValue {
-    type Err = Infallible;
+    let i8_vals = vec![Some(-1), None, Some(1)];
+    let i16_vals = make_typed_vec!(i8_vals, i16);
+    let i32_vals = make_typed_vec!(i8_vals, i32);
+    let i64_vals = make_typed_vec!(i8_vals, i64);
 
-    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
-        Ok(s.into())
-    }
-}
+    let u8_vals = vec![Some(0), None, Some(1)];
+    let u16_vals = make_typed_vec!(u8_vals, u16);
+    let u32_vals = make_typed_vec!(u8_vals, u32);
+    let u64_vals = make_typed_vec!(u8_vals, u64);
 
-impl From<Vec<(&str, ScalarValue)>> for ScalarValue {
-    fn from(value: Vec<(&str, ScalarValue)>) -> Self {
-        let (fields, scalars): (Vec<_>, Vec<_>) = value
-            .into_iter()
-            .map(|(name, scalar)| {
-                (Field::new(name, scalar.get_datatype(), false), scalar)
-            })
-            .unzip();
+    let str_vals = vec![Some("foo"), None, Some("bar")];
 
-        Self::Struct(Some(Box::new(scalars)), Box::new(fields))
+    /// Test each value in `scalar` with the corresponding element
+    /// at `array`. Assumes each element is unique (aka not equal
+    /// with all other indexes)
+    struct TestCase {
+      array: ArrayRef,
+      scalars: Vec<ScalarValue>,
     }
-}
-
-macro_rules! impl_try_from {
-    ($SCALAR:ident, $NATIVE:ident) => {
-        impl TryFrom<ScalarValue> for $NATIVE {
-            type Error = DataFusionError;
-
-            fn try_from(value: ScalarValue) -> Result<Self> {
-                match value {
-                    ScalarValue::$SCALAR(Some(inner_value)) => Ok(inner_value),
-                    _ => Err(DataFusionError::Internal(format!(
-                        "Cannot convert {:?} to {}",
-                        value,
-                        std::any::type_name::<Self>()
-                    ))),
-                }
-            }
-        }
-    };
-}
 
-impl_try_from!(Int8, i8);
-impl_try_from!(Int16, i16);
-
-// special implementation for i32 because of Date32
-impl TryFrom<ScalarValue> for i32 {
-    type Error = DataFusionError;
-
-    fn try_from(value: ScalarValue) -> Result<Self> {
-        match value {
-            ScalarValue::Int32(Some(inner_value))
-            | ScalarValue::Date32(Some(inner_value)) => Ok(inner_value),
-            _ => Err(DataFusionError::Internal(format!(
-                "Cannot convert {:?} to {}",
-                value,
-                std::any::type_name::<Self>()
-            ))),
+    /// Create a test case for casing the input to the specified array type
+    macro_rules! make_test_case {
+      ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident) => {{
+        TestCase {
+          array: Arc::new($INPUT.iter().collect::<$ARRAY_TY>()),
+          scalars: $INPUT.iter().map(|v| ScalarValue::$SCALAR_TY(*v)).collect(),
         }
-    }
-}
-
-// special implementation for i64 because of TimeNanosecond
-impl TryFrom<ScalarValue> for i64 {
-    type Error = DataFusionError;
-
-    fn try_from(value: ScalarValue) -> Result<Self> {
-        match value {
-            ScalarValue::Int64(Some(inner_value))
-            | ScalarValue::Date64(Some(inner_value))
-            | ScalarValue::TimestampNanosecond(Some(inner_value), _)
-            | ScalarValue::TimestampMicrosecond(Some(inner_value), _)
-            | ScalarValue::TimestampMillisecond(Some(inner_value), _)
-            | ScalarValue::TimestampSecond(Some(inner_value), _) => Ok(inner_value),
-            _ => Err(DataFusionError::Internal(format!(
-                "Cannot convert {:?} to {}",
-                value,
-                std::any::type_name::<Self>()
-            ))),
+      }};
+
+      ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident, $TZ:expr) => {{
+        let tz = $TZ;
+        TestCase {
+          array: Arc::new($INPUT.iter().collect::<$ARRAY_TY>()),
+          scalars: $INPUT
+            .iter()
+            .map(|v| ScalarValue::$SCALAR_TY(*v, tz.clone()))
+            .collect(),
         }
+      }};
     }
-}
 
-// special implementation for i128 because of Decimal128
-impl TryFrom<ScalarValue> for i128 {
-    type Error = DataFusionError;
-
-    fn try_from(value: ScalarValue) -> Result<Self> {
-        match value {
-            ScalarValue::Decimal128(Some(inner_value), _, _) => Ok(inner_value),
-            _ => Err(DataFusionError::Internal(format!(
-                "Cannot convert {:?} to {}",
-                value,
-                std::any::type_name::<Self>()
-            ))),
+    macro_rules! make_str_test_case {
+      ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident) => {{
+        TestCase {
+          array: Arc::new($INPUT.iter().cloned().collect::<$ARRAY_TY>()),
+          scalars: $INPUT
+            .iter()
+            .map(|v| ScalarValue::$SCALAR_TY(v.map(|v| v.to_string())))
+            .collect(),
         }
+      }};
     }
-}
 
-impl_try_from!(UInt8, u8);
-impl_try_from!(UInt16, u16);
-impl_try_from!(UInt32, u32);
-impl_try_from!(UInt64, u64);
-impl_try_from!(Float32, f32);
-impl_try_from!(Float64, f64);
-impl_try_from!(Boolean, bool);
-
-impl TryFrom<&DataType> for ScalarValue {
-    type Error = DataFusionError;
-
-    /// Create a Null instance of ScalarValue for this datatype
-    fn try_from(datatype: &DataType) -> Result<Self> {
-        Ok(match datatype {
-            DataType::Boolean => ScalarValue::Boolean(None),
-            DataType::Float64 => ScalarValue::Float64(None),
-            DataType::Float32 => ScalarValue::Float32(None),
-            DataType::Int8 => ScalarValue::Int8(None),
-            DataType::Int16 => ScalarValue::Int16(None),
-            DataType::Int32 => ScalarValue::Int32(None),
-            DataType::Int64 => ScalarValue::Int64(None),
-            DataType::UInt8 => ScalarValue::UInt8(None),
-            DataType::UInt16 => ScalarValue::UInt16(None),
-            DataType::UInt32 => ScalarValue::UInt32(None),
-            DataType::UInt64 => ScalarValue::UInt64(None),
-            DataType::Decimal(precision, scale) => {
-                ScalarValue::Decimal128(None, *precision, *scale)
-            }
-            DataType::Utf8 => ScalarValue::Utf8(None),
-            DataType::LargeUtf8 => ScalarValue::LargeUtf8(None),
-            DataType::Date32 => ScalarValue::Date32(None),
-            DataType::Date64 => ScalarValue::Date64(None),
-            DataType::Timestamp(TimeUnit::Second, tz_opt) => {
-                ScalarValue::TimestampSecond(None, tz_opt.clone())
-            }
-            DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
-                ScalarValue::TimestampMillisecond(None, tz_opt.clone())
-            }
-            DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
-                ScalarValue::TimestampMicrosecond(None, tz_opt.clone())
-            }
-            DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
-                ScalarValue::TimestampNanosecond(None, tz_opt.clone())
-            }
-            DataType::Dictionary(_index_type, value_type) => {
-                value_type.as_ref().try_into()?
-            }
-            DataType::List(ref nested_type) => {
-                ScalarValue::List(None, Box::new(nested_type.data_type().clone()))
-            }
-            DataType::Struct(fields) => {
-                ScalarValue::Struct(None, Box::new(fields.clone()))
-            }
-            _ => {
-                return Err(DataFusionError::NotImplemented(format!(
-                    "Can't create a scalar from data_type \"{:?}\"",
-                    datatype
-                )));
-            }
-        })
-    }
-}
-
-macro_rules! format_option {
-    ($F:expr, $EXPR:expr) => {{
-        match $EXPR {
-            Some(e) => write!($F, "{}", e),
-            None => write!($F, "NULL"),
+    macro_rules! make_binary_test_case {
+      ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident) => {{
+        TestCase {
+          array: Arc::new($INPUT.iter().cloned().collect::<$ARRAY_TY>()),
+          scalars: $INPUT
+            .iter()
+            .map(|v| ScalarValue::$SCALAR_TY(v.map(|v| v.as_bytes().to_vec())))
+            .collect(),
         }
-    }};
-}
-
-impl fmt::Display for ScalarValue {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            ScalarValue::Decimal128(v, p, s) => {
-                write!(f, "{:?},{:?},{:?}", v, p, s)?;
-            }
-            ScalarValue::Boolean(e) => format_option!(f, e)?,
-            ScalarValue::Float32(e) => format_option!(f, e)?,
-            ScalarValue::Float64(e) => format_option!(f, e)?,
-            ScalarValue::Int8(e) => format_option!(f, e)?,
-            ScalarValue::Int16(e) => format_option!(f, e)?,
-            ScalarValue::Int32(e) => format_option!(f, e)?,
-            ScalarValue::Int64(e) => format_option!(f, e)?,
-            ScalarValue::UInt8(e) => format_option!(f, e)?,
-            ScalarValue::UInt16(e) => format_option!(f, e)?,
-            ScalarValue::UInt32(e) => format_option!(f, e)?,
-            ScalarValue::UInt64(e) => format_option!(f, e)?,
-            ScalarValue::TimestampSecond(e, _) => format_option!(f, e)?,
-            ScalarValue::TimestampMillisecond(e, _) => format_option!(f, e)?,
-            ScalarValue::TimestampMicrosecond(e, _) => format_option!(f, e)?,
-            ScalarValue::TimestampNanosecond(e, _) => format_option!(f, e)?,
-            ScalarValue::Utf8(e) => format_option!(f, e)?,
-            ScalarValue::LargeUtf8(e) => format_option!(f, e)?,
-            ScalarValue::Binary(e) => match e {
-                Some(l) => write!(
-                    f,
-                    "{}",
-                    l.iter()
-                        .map(|v| format!("{}", v))
-                        .collect::<Vec<_>>()
-                        .join(",")
-                )?,
-                None => write!(f, "NULL")?,
-            },
-            ScalarValue::LargeBinary(e) => match e {
-                Some(l) => write!(
-                    f,
-                    "{}",
-                    l.iter()
-                        .map(|v| format!("{}", v))
-                        .collect::<Vec<_>>()
-                        .join(",")
-                )?,
-                None => write!(f, "NULL")?,
-            },
-            ScalarValue::List(e, _) => match e {
-                Some(l) => write!(
-                    f,
-                    "{}",
-                    l.iter()
-                        .map(|v| format!("{}", v))
-                        .collect::<Vec<_>>()
-                        .join(",")
-                )?,
-                None => write!(f, "NULL")?,
-            },
-            ScalarValue::Date32(e) => format_option!(f, e)?,
-            ScalarValue::Date64(e) => format_option!(f, e)?,
-            ScalarValue::IntervalDayTime(e) => format_option!(f, e)?,
-            ScalarValue::IntervalYearMonth(e) => format_option!(f, e)?,
-            ScalarValue::IntervalMonthDayNano(e) => format_option!(f, e)?,
-            ScalarValue::Struct(e, fields) => match e {
-                Some(l) => write!(
-                    f,
-                    "{{{}}}",
-                    l.iter()
-                        .zip(fields.iter())
-                        .map(|(value, field)| format!("{}:{}", field.name(), value))
-                        .collect::<Vec<_>>()
-                        .join(",")
-                )?,
-                None => write!(f, "NULL")?,
-            },
-        };
-        Ok(())
+      }};
     }
-}
 
-impl fmt::Debug for ScalarValue {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            ScalarValue::Decimal128(_, _, _) => write!(f, "Decimal128({})", self),
-            ScalarValue::Boolean(_) => write!(f, "Boolean({})", self),
-            ScalarValue::Float32(_) => write!(f, "Float32({})", self),
-            ScalarValue::Float64(_) => write!(f, "Float64({})", self),
-            ScalarValue::Int8(_) => write!(f, "Int8({})", self),
-            ScalarValue::Int16(_) => write!(f, "Int16({})", self),
-            ScalarValue::Int32(_) => write!(f, "Int32({})", self),
-            ScalarValue::Int64(_) => write!(f, "Int64({})", self),
-            ScalarValue::UInt8(_) => write!(f, "UInt8({})", self),
-            ScalarValue::UInt16(_) => write!(f, "UInt16({})", self),
-            ScalarValue::UInt32(_) => write!(f, "UInt32({})", self),
-            ScalarValue::UInt64(_) => write!(f, "UInt64({})", self),
-            ScalarValue::TimestampSecond(_, tz_opt) => {
-                write!(f, "TimestampSecond({}, {:?})", self, tz_opt)
-            }
-            ScalarValue::TimestampMillisecond(_, tz_opt) => {
-                write!(f, "TimestampMillisecond({}, {:?})", self, tz_opt)
-            }
-            ScalarValue::TimestampMicrosecond(_, tz_opt) => {
-                write!(f, "TimestampMicrosecond({}, {:?})", self, tz_opt)
-            }
-            ScalarValue::TimestampNanosecond(_, tz_opt) => {
-                write!(f, "TimestampNanosecond({}, {:?})", self, tz_opt)
-            }
-            ScalarValue::Utf8(None) => write!(f, "Utf8({})", self),
-            ScalarValue::Utf8(Some(_)) => write!(f, "Utf8(\"{}\")", self),
-            ScalarValue::LargeUtf8(None) => write!(f, "LargeUtf8({})", self),
-            ScalarValue::LargeUtf8(Some(_)) => write!(f, "LargeUtf8(\"{}\")", self),
-            ScalarValue::Binary(None) => write!(f, "Binary({})", self),
-            ScalarValue::Binary(Some(_)) => write!(f, "Binary(\"{}\")", self),
-            ScalarValue::LargeBinary(None) => write!(f, "LargeBinary({})", self),
-            ScalarValue::LargeBinary(Some(_)) => write!(f, "LargeBinary(\"{}\")", self),
-            ScalarValue::List(_, _) => write!(f, "List([{}])", self),
-            ScalarValue::Date32(_) => write!(f, "Date32(\"{}\")", self),
-            ScalarValue::Date64(_) => write!(f, "Date64(\"{}\")", self),
-            ScalarValue::IntervalDayTime(_) => {
-                write!(f, "IntervalDayTime(\"{}\")", self)
-            }
-            ScalarValue::IntervalYearMonth(_) => {
-                write!(f, "IntervalYearMonth(\"{}\")", self)
-            }
-            ScalarValue::IntervalMonthDayNano(_) => {
-                write!(f, "IntervalMonthDayNano(\"{}\")", self)
-            }
-            ScalarValue::Struct(e, fields) => {
-                // Use Debug representation of field values
-                match e {
-                    Some(l) => write!(
-                        f,
-                        "Struct({{{}}})",
-                        l.iter()
-                            .zip(fields.iter())
-                            .map(|(value, field)| format!("{}:{:?}", field.name(), value))
-                            .collect::<Vec<_>>()
-                            .join(",")
-                    ),
-                    None => write!(f, "Struct(NULL)"),
-                }
-            }
+    /// create a test case for DictionaryArray<$INDEX_TY>
+    macro_rules! make_str_dict_test_case {
+      ($INPUT:expr, $INDEX_TY:ident, $SCALAR_TY:ident) => {{
+        TestCase {
+          array: Arc::new(
+            $INPUT
+              .iter()
+              .cloned()
+              .collect::<DictionaryArray<$INDEX_TY>>(),
+          ),
+          scalars: $INPUT
+            .iter()
+            .map(|v| ScalarValue::$SCALAR_TY(v.map(|v| v.to_string())))
+            .collect(),
         }
+      }};
     }
-}
-
-/// Trait used to map a NativeTime to a ScalarType.
-pub trait ScalarType<T: ArrowNativeType> {
-    /// returns a scalar from an optional T
-    fn scalar(r: Option<T>) -> ScalarValue;
-}
-
-impl ScalarType<f32> for Float32Type {
-    fn scalar(r: Option<f32>) -> ScalarValue {
-        ScalarValue::Float32(r)
-    }
-}
-
-impl ScalarType<i64> for TimestampSecondType {
-    fn scalar(r: Option<i64>) -> ScalarValue {
-        ScalarValue::TimestampSecond(r, None)
-    }
-}
-
-impl ScalarType<i64> for TimestampMillisecondType {
-    fn scalar(r: Option<i64>) -> ScalarValue {
-        ScalarValue::TimestampMillisecond(r, None)
-    }
-}
-
-impl ScalarType<i64> for TimestampMicrosecondType {
-    fn scalar(r: Option<i64>) -> ScalarValue {
-        ScalarValue::TimestampMicrosecond(r, None)
-    }
-}
-
-impl ScalarType<i64> for TimestampNanosecondType {
-    fn scalar(r: Option<i64>) -> ScalarValue {
-        ScalarValue::TimestampNanosecond(r, None)
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::from_slice::FromSlice;
-
-    #[test]
-    fn scalar_decimal_test() {
-        let decimal_value = ScalarValue::Decimal128(Some(123), 10, 1);
-        assert_eq!(DataType::Decimal(10, 1), decimal_value.get_datatype());
-        let try_into_value: i128 = decimal_value.clone().try_into().unwrap();
-        assert_eq!(123_i128, try_into_value);
-        assert!(!decimal_value.is_null());
-        let neg_decimal_value = decimal_value.arithmetic_negate();
-        match neg_decimal_value {
-            ScalarValue::Decimal128(v, _, _) => {
-                assert_eq!(-123, v.unwrap());
-            }
-            _ => {
-                unreachable!();
-            }
-        }
-
-        // decimal scalar to array
-        let array = decimal_value.to_array();
-        let array = array.as_any().downcast_ref::<DecimalArray>().unwrap();
-        assert_eq!(1, array.len());
-        assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
-        assert_eq!(123i128, array.value(0));
-
-        // decimal scalar to array with size
-        let array = decimal_value.to_array_of_size(10);
-        let array_decimal = array.as_any().downcast_ref::<DecimalArray>().unwrap();
-        assert_eq!(10, array.len());
-        assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
-        assert_eq!(123i128, array_decimal.value(0));
-        assert_eq!(123i128, array_decimal.value(9));
-        // test eq array
-        assert!(decimal_value.eq_array(&array, 1));
-        assert!(decimal_value.eq_array(&array, 5));
-        // test try from array
-        assert_eq!(
-            decimal_value,
-            ScalarValue::try_from_array(&array, 5).unwrap()
-        );
 
-        assert_eq!(
-            decimal_value,
-            ScalarValue::try_new_decimal128(123, 10, 1).unwrap()
-        );
-
-        // test compare
-        let left = ScalarValue::Decimal128(Some(123), 10, 2);
-        let right = ScalarValue::Decimal128(Some(124), 10, 2);
-        assert!(!left.eq(&right));
-        let result = left < right;
-        assert!(result);
-        let result = left <= right;
-        assert!(result);
-        let right = ScalarValue::Decimal128(Some(124), 10, 3);
-        // make sure that two decimals with diff datatype can't be compared.
-        let result = left.partial_cmp(&right);
-        assert_eq!(None, result);
-
-        let decimal_vec = vec![
-            ScalarValue::Decimal128(Some(1), 10, 2),
-            ScalarValue::Decimal128(Some(2), 10, 2),
-            ScalarValue::Decimal128(Some(3), 10, 2),
-        ];
-        // convert the vec to decimal array and check the result
-        let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap();
-        assert_eq!(3, array.len());
-        assert_eq!(DataType::Decimal(10, 2), array.data_type().clone());
-
-        let decimal_vec = vec![
-            ScalarValue::Decimal128(Some(1), 10, 2),
-            ScalarValue::Decimal128(Some(2), 10, 2),
-            ScalarValue::Decimal128(Some(3), 10, 2),
-            ScalarValue::Decimal128(None, 10, 2),
-        ];
-        let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap();
-        assert_eq!(4, array.len());
-        assert_eq!(DataType::Decimal(10, 2), array.data_type().clone());
-
-        assert!(ScalarValue::try_new_decimal128(1, 10, 2)
-            .unwrap()
-            .eq_array(&array, 0));
-        assert!(ScalarValue::try_new_decimal128(2, 10, 2)
-            .unwrap()
-            .eq_array(&array, 1));
-        assert!(ScalarValue::try_new_decimal128(3, 10, 2)
-            .unwrap()
-            .eq_array(&array, 2));
-        assert_eq!(
-            ScalarValue::Decimal128(None, 10, 2),
-            ScalarValue::try_from_array(&array, 3).unwrap()
-        );
-        assert_eq!(
-            ScalarValue::Decimal128(None, 10, 2),
-            ScalarValue::try_from_array(&array, 4).unwrap()
-        );
-    }
-
-    #[test]
-    fn scalar_value_to_array_u64() {
-        let value = ScalarValue::UInt64(Some(13u64));
-        let array = value.to_array();
-        let array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
-        assert_eq!(array.len(), 1);
-        assert!(!array.is_null(0));
-        assert_eq!(array.value(0), 13);
-
-        let value = ScalarValue::UInt64(None);
-        let array = value.to_array();
-        let array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
-        assert_eq!(array.len(), 1);
-        assert!(array.is_null(0));
-    }
-
-    #[test]
-    fn scalar_value_to_array_u32() {
-        let value = ScalarValue::UInt32(Some(13u32));
-        let array = value.to_array();
-        let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
-        assert_eq!(array.len(), 1);
-        assert!(!array.is_null(0));
-        assert_eq!(array.value(0), 13);
-
-        let value = ScalarValue::UInt32(None);
-        let array = value.to_array();
-        let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
-        assert_eq!(array.len(), 1);
-        assert!(array.is_null(0));
-    }
-
-    #[test]
-    fn scalar_list_null_to_array() {
-        let list_array_ref =
-            ScalarValue::List(None, Box::new(DataType::UInt64)).to_array();
-        let list_array = list_array_ref.as_any().downcast_ref::<ListArray>().unwrap();
-
-        assert!(list_array.is_null(0));
-        assert_eq!(list_array.len(), 1);
-        assert_eq!(list_array.values().len(), 0);
-    }
-
-    #[test]
-    fn scalar_list_to_array() {
-        let list_array_ref = ScalarValue::List(
-            Some(Box::new(vec![
-                ScalarValue::UInt64(Some(100)),
-                ScalarValue::UInt64(None),
-                ScalarValue::UInt64(Some(101)),
-            ])),
-            Box::new(DataType::UInt64),
-        )
-        .to_array();
-
-        let list_array = list_array_ref.as_any().downcast_ref::<ListArray>().unwrap();
-        assert_eq!(list_array.len(), 1);
-        assert_eq!(list_array.values().len(), 3);
-
-        let prim_array_ref = list_array.value(0);
-        let prim_array = prim_array_ref
-            .as_any()
-            .downcast_ref::<UInt64Array>()
-            .unwrap();
-        assert_eq!(prim_array.len(), 3);
-        assert_eq!(prim_array.value(0), 100);
-        assert!(prim_array.is_null(1));
-        assert_eq!(prim_array.value(2), 101);
-    }
-
-    /// Creates array directly and via ScalarValue and ensures they are the same
-    macro_rules! check_scalar_iter {
-        ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{
-            let scalars: Vec<_> =
-                $INPUT.iter().map(|v| ScalarValue::$SCALAR_T(*v)).collect();
-
-            let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();
-
-            let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT));
-
-            assert_eq!(&array, &expected);
-        }};
-    }
-
-    /// Creates array directly and via ScalarValue and ensures they are the same
-    /// but for variants that carry a timezone field.
-    macro_rules! check_scalar_iter_tz {
-        ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{
-            let scalars: Vec<_> = $INPUT
-                .iter()
-                .map(|v| ScalarValue::$SCALAR_T(*v, None))
-                .collect();
-
-            let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();
-
-            let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT));
-
-            assert_eq!(&array, &expected);
-        }};
-    }
-
-    /// Creates array directly and via ScalarValue and ensures they
-    /// are the same, for string  arrays
-    macro_rules! check_scalar_iter_string {
-        ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{
-            let scalars: Vec<_> = $INPUT
-                .iter()
-                .map(|v| ScalarValue::$SCALAR_T(v.map(|v| v.to_string())))
-                .collect();
-
-            let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();
-
-            let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT));
-
-            assert_eq!(&array, &expected);
-        }};
-    }
-
-    /// Creates array directly and via ScalarValue and ensures they
-    /// are the same, for binary arrays
-    macro_rules! check_scalar_iter_binary {
-        ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{
-            let scalars: Vec<_> = $INPUT
-                .iter()
-                .map(|v| ScalarValue::$SCALAR_T(v.map(|v| v.to_vec())))
-                .collect();
-
-            let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();
-
-            let expected: $ARRAYTYPE =
-                $INPUT.iter().map(|v| v.map(|v| v.to_vec())).collect();
-
-            let expected: ArrayRef = Arc::new(expected);
-
-            assert_eq!(&array, &expected);
-        }};
-    }
-
-    #[test]
-    fn scalar_iter_to_array_boolean() {
-        check_scalar_iter!(Boolean, BooleanArray, vec![Some(true), None, Some(false)]);
-        check_scalar_iter!(Float32, Float32Array, vec![Some(1.9), None, Some(-2.1)]);
-        check_scalar_iter!(Float64, Float64Array, vec![Some(1.9), None, Some(-2.1)]);
-
-        check_scalar_iter!(Int8, Int8Array, vec![Some(1), None, Some(3)]);
-        check_scalar_iter!(Int16, Int16Array, vec![Some(1), None, Some(3)]);
-        check_scalar_iter!(Int32, Int32Array, vec![Some(1), None, Some(3)]);
-        check_scalar_iter!(Int64, Int64Array, vec![Some(1), None, Some(3)]);
-
-        check_scalar_iter!(UInt8, UInt8Array, vec![Some(1), None, Some(3)]);
-        check_scalar_iter!(UInt16, UInt16Array, vec![Some(1), None, Some(3)]);
-        check_scalar_iter!(UInt32, UInt32Array, vec![Some(1), None, Some(3)]);
-        check_scalar_iter!(UInt64, UInt64Array, vec![Some(1), None, Some(3)]);
-
-        check_scalar_iter_tz!(
-            TimestampSecond,
-            TimestampSecondArray,
-            vec![Some(1), None, Some(3)]
-        );
-        check_scalar_iter_tz!(
-            TimestampMillisecond,
-            TimestampMillisecondArray,
-            vec![Some(1), None, Some(3)]
-        );
-        check_scalar_iter_tz!(
-            TimestampMicrosecond,
-            TimestampMicrosecondArray,
-            vec![Some(1), None, Some(3)]
-        );
-        check_scalar_iter_tz!(
-            TimestampNanosecond,
-            TimestampNanosecondArray,
-            vec![Some(1), None, Some(3)]
-        );
-
-        check_scalar_iter_string!(
-            Utf8,
-            StringArray,
-            vec![Some("foo"), None, Some("bar")]
-        );
-        check_scalar_iter_string!(
-            LargeUtf8,
-            LargeStringArray,
-            vec![Some("foo"), None, Some("bar")]
-        );
-        check_scalar_iter_binary!(
-            Binary,
-            BinaryArray,
-            vec![Some(b"foo"), None, Some(b"bar")]
-        );
-        check_scalar_iter_binary!(
-            LargeBinary,
-            LargeBinaryArray,
-            vec![Some(b"foo"), None, Some(b"bar")]
-        );
-    }
-
-    #[test]
-    fn scalar_iter_to_array_empty() {
-        let scalars = vec![] as Vec<ScalarValue>;
-
-        let result = ScalarValue::iter_to_array(scalars.into_iter()).unwrap_err();
+    let cases = vec![
+      make_test_case!(bool_vals, BooleanArray, Boolean),
+      make_test_case!(f32_vals, Float32Array, Float32),
+      make_test_case!(f64_vals, Float64Array, Float64),
+      make_test_case!(i8_vals, Int8Array, Int8),
+      make_test_case!(i16_vals, Int16Array, Int16),
+      make_test_case!(i32_vals, Int32Array, Int32),
+      make_test_case!(i64_vals, Int64Array, Int64),
+      make_test_case!(u8_vals, UInt8Array, UInt8),
+      make_test_case!(u16_vals, UInt16Array, UInt16),
+      make_test_case!(u32_vals, UInt32Array, UInt32),
+      make_test_case!(u64_vals, UInt64Array, UInt64),
+      make_str_test_case!(str_vals, StringArray, Utf8),
+      make_str_test_case!(str_vals, LargeStringArray, LargeUtf8),
+      make_binary_test_case!(str_vals, BinaryArray, Binary),
+      make_binary_test_case!(str_vals, LargeBinaryArray, LargeBinary),
+      make_test_case!(i32_vals, Date32Array, Date32),
+      make_test_case!(i64_vals, Date64Array, Date64),
+      make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond, None),
+      make_test_case!(
+        i64_vals,
+        TimestampSecondArray,
+        TimestampSecond,
+        Some("UTC".to_owned())
+      ),
+      make_test_case!(
+        i64_vals,
+        TimestampMillisecondArray,
+        TimestampMillisecond,
+        None
+      ),
+      make_test_case!(
+        i64_vals,
+        TimestampMillisecondArray,
+        TimestampMillisecond,
+        Some("UTC".to_owned())
+      ),
+      make_test_case!(
+        i64_vals,
+        TimestampMicrosecondArray,
+        TimestampMicrosecond,
+        None
+      ),
+      make_test_case!(
+        i64_vals,
+        TimestampMicrosecondArray,
+        TimestampMicrosecond,
+        Some("UTC".to_owned())
+      ),
+      make_test_case!(
+        i64_vals,
+        TimestampNanosecondArray,
+        TimestampNanosecond,
+        None
+      ),
+      make_test_case!(
+        i64_vals,
+        TimestampNanosecondArray,
+        TimestampNanosecond,
+        Some("UTC".to_owned())
+      ),
+      make_test_case!(i32_vals, IntervalYearMonthArray, IntervalYearMonth),
+      make_test_case!(i64_vals, IntervalDayTimeArray, IntervalDayTime),
+      make_str_dict_test_case!(str_vals, Int8Type, Utf8),
+      make_str_dict_test_case!(str_vals, Int16Type, Utf8),
+      make_str_dict_test_case!(str_vals, Int32Type, Utf8),
+      make_str_dict_test_case!(str_vals, Int64Type, Utf8),
+      make_str_dict_test_case!(str_vals, UInt8Type, Utf8),
+      make_str_dict_test_case!(str_vals, UInt16Type, Utf8),
+      make_str_dict_test_case!(str_vals, UInt32Type, Utf8),
+      make_str_dict_test_case!(str_vals, UInt64Type, Utf8),
+    ];
+
+    for case in cases {
+      let TestCase { array, scalars } = case;
+      assert_eq!(array.len(), scalars.len());
+
+      for (index, scalar) in scalars.into_iter().enumerate() {
         assert!(
-            result
-                .to_string()
-                .contains("Empty iterator passed to ScalarValue::iter_to_array"),
-            "{}",
-            result
-        );
-    }
-
-    #[test]
-    fn scalar_iter_to_array_mismatched_types() {
-        use ScalarValue::*;
-        // If the scalar values are not all the correct type, error here
-        let scalars: Vec<ScalarValue> = vec![Boolean(Some(true)), Int32(Some(5))];
-
-        let result = ScalarValue::iter_to_array(scalars.into_iter()).unwrap_err();
-        assert!(result.to_string().contains("Inconsistent types in ScalarValue::iter_to_array. Expected Boolean, got Int32(5)"),
-                "{}", result);
-    }
-
-    #[test]
-    fn scalar_try_from_array_null() {
-        let array = vec![Some(33), None].into_iter().collect::<Int64Array>();
-        let array: ArrayRef = Arc::new(array);
-
-        assert_eq!(
-            ScalarValue::Int64(Some(33)),
-            ScalarValue::try_from_array(&array, 0).unwrap()
+          scalar.eq_array(&array, index),
+          "Expected {:?} to be equal to {:?} at index {}",
+          scalar,
+          array,
+          index
         );
-        assert_eq!(
-            ScalarValue::Int64(None),
-            ScalarValue::try_from_array(&array, 1).unwrap()
-        );
-    }
-
-    #[test]
-    fn scalar_try_from_dict_datatype() {
-        let data_type =
-            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8));
-        let data_type = &data_type;
-        assert_eq!(ScalarValue::Utf8(None), data_type.try_into().unwrap())
-    }
-
-    #[test]
-    fn size_of_scalar() {
-        // Since ScalarValues are used in a non trivial number of places,
-        // making it larger means significant more memory consumption
-        // per distinct value.
-        #[cfg(target_arch = "aarch64")]
-        assert_eq!(std::mem::size_of::<ScalarValue>(), 64);
-
-        #[cfg(target_arch = "amd64")]
-        assert_eq!(std::mem::size_of::<ScalarValue>(), 48);
-    }
-
-    #[test]
-    fn scalar_eq_array() {
-        // Validate that eq_array has the same semantics as ScalarValue::eq
-        macro_rules! make_typed_vec {
-            ($INPUT:expr, $TYPE:ident) => {{
-                $INPUT
-                    .iter()
-                    .map(|v| v.map(|v| v as $TYPE))
-                    .collect::<Vec<_>>()
-            }};
-        }
-
-        let bool_vals = vec![Some(true), None, Some(false)];
-        let f32_vals = vec![Some(-1.0), None, Some(1.0)];
-        let f64_vals = make_typed_vec!(f32_vals, f64);
-
-        let i8_vals = vec![Some(-1), None, Some(1)];
-        let i16_vals = make_typed_vec!(i8_vals, i16);
-        let i32_vals = make_typed_vec!(i8_vals, i32);
-        let i64_vals = make_typed_vec!(i8_vals, i64);
-
-        let u8_vals = vec![Some(0), None, Some(1)];
-        let u16_vals = make_typed_vec!(u8_vals, u16);
-        let u32_vals = make_typed_vec!(u8_vals, u32);
-        let u64_vals = make_typed_vec!(u8_vals, u64);
-
-        let str_vals = vec![Some("foo"), None, Some("bar")];
-
-        /// Test each value in `scalar` with the corresponding element
-        /// at `array`. Assumes each element is unique (aka not equal
-        /// with all other indexes)
-        struct TestCase {
-            array: ArrayRef,
-            scalars: Vec<ScalarValue>,
-        }
-
-        /// Create a test case for casing the input to the specified array type
-        macro_rules! make_test_case {
-            ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident) => {{
-                TestCase {
-                    array: Arc::new($INPUT.iter().collect::<$ARRAY_TY>()),
-                    scalars: $INPUT.iter().map(|v| ScalarValue::$SCALAR_TY(*v)).collect(),
-                }
-            }};
-
-            ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident, $TZ:expr) => {{
-                let tz = $TZ;
-                TestCase {
-                    array: Arc::new($INPUT.iter().collect::<$ARRAY_TY>()),
-                    scalars: $INPUT
-                        .iter()
-                        .map(|v| ScalarValue::$SCALAR_TY(*v, tz.clone()))
-                        .collect(),
-                }
-            }};
-        }
-
-        macro_rules! make_str_test_case {
-            ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident) => {{
-                TestCase {
-                    array: Arc::new($INPUT.iter().cloned().collect::<$ARRAY_TY>()),
-                    scalars: $INPUT
-                        .iter()
-                        .map(|v| ScalarValue::$SCALAR_TY(v.map(|v| v.to_string())))
-                        .collect(),
-                }
-            }};
-        }
-
-        macro_rules! make_binary_test_case {
-            ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident) => {{
-                TestCase {
-                    array: Arc::new($INPUT.iter().cloned().collect::<$ARRAY_TY>()),
-                    scalars: $INPUT
-                        .iter()
-                        .map(|v| {
-                            ScalarValue::$SCALAR_TY(v.map(|v| v.as_bytes().to_vec()))
-                        })
-                        .collect(),
-                }
-            }};
-        }
-
-        /// create a test case for DictionaryArray<$INDEX_TY>
-        macro_rules! make_str_dict_test_case {
-            ($INPUT:expr, $INDEX_TY:ident, $SCALAR_TY:ident) => {{
-                TestCase {
-                    array: Arc::new(
-                        $INPUT
-                            .iter()
-                            .cloned()
-                            .collect::<DictionaryArray<$INDEX_TY>>(),
-                    ),
-                    scalars: $INPUT
-                        .iter()
-                        .map(|v| ScalarValue::$SCALAR_TY(v.map(|v| v.to_string())))
-                        .collect(),
-                }
-            }};
-        }
 
-        let cases = vec![
-            make_test_case!(bool_vals, BooleanArray, Boolean),
-            make_test_case!(f32_vals, Float32Array, Float32),
-            make_test_case!(f64_vals, Float64Array, Float64),
-            make_test_case!(i8_vals, Int8Array, Int8),
-            make_test_case!(i16_vals, Int16Array, Int16),
-            make_test_case!(i32_vals, Int32Array, Int32),
-            make_test_case!(i64_vals, Int64Array, Int64),
-            make_test_case!(u8_vals, UInt8Array, UInt8),
-            make_test_case!(u16_vals, UInt16Array, UInt16),
-            make_test_case!(u32_vals, UInt32Array, UInt32),
-            make_test_case!(u64_vals, UInt64Array, UInt64),
-            make_str_test_case!(str_vals, StringArray, Utf8),
-            make_str_test_case!(str_vals, LargeStringArray, LargeUtf8),
-            make_binary_test_case!(str_vals, BinaryArray, Binary),
-            make_binary_test_case!(str_vals, LargeBinaryArray, LargeBinary),
-            make_test_case!(i32_vals, Date32Array, Date32),
-            make_test_case!(i64_vals, Date64Array, Date64),
-            make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond, None),
-            make_test_case!(
-                i64_vals,
-                TimestampSecondArray,
-                TimestampSecond,
-                Some("UTC".to_owned())
-            ),
-            make_test_case!(
-                i64_vals,
-                TimestampMillisecondArray,
-                TimestampMillisecond,
-                None
-            ),
-            make_test_case!(
-                i64_vals,
-                TimestampMillisecondArray,
-                TimestampMillisecond,
-                Some("UTC".to_owned())
-            ),
-            make_test_case!(
-                i64_vals,
-                TimestampMicrosecondArray,
-                TimestampMicrosecond,
-                None
-            ),
-            make_test_case!(
-                i64_vals,
-                TimestampMicrosecondArray,
-                TimestampMicrosecond,
-                Some("UTC".to_owned())
-            ),
-            make_test_case!(
-                i64_vals,
-                TimestampNanosecondArray,
-                TimestampNanosecond,
-                None
-            ),
-            make_test_case!(
-                i64_vals,
-                TimestampNanosecondArray,
-                TimestampNanosecond,
-                Some("UTC".to_owned())
-            ),
-            make_test_case!(i32_vals, IntervalYearMonthArray, IntervalYearMonth),
-            make_test_case!(i64_vals, IntervalDayTimeArray, IntervalDayTime),
-            make_str_dict_test_case!(str_vals, Int8Type, Utf8),
-            make_str_dict_test_case!(str_vals, Int16Type, Utf8),
-            make_str_dict_test_case!(str_vals, Int32Type, Utf8),
-            make_str_dict_test_case!(str_vals, Int64Type, Utf8),
-            make_str_dict_test_case!(str_vals, UInt8Type, Utf8),
-            make_str_dict_test_case!(str_vals, UInt16Type, Utf8),
-            make_str_dict_test_case!(str_vals, UInt32Type, Utf8),
-            make_str_dict_test_case!(str_vals, UInt64Type, Utf8),
-        ];
-
-        for case in cases {
-            let TestCase { array, scalars } = case;
-            assert_eq!(array.len(), scalars.len());
-
-            for (index, scalar) in scalars.into_iter().enumerate() {
-                assert!(
-                    scalar.eq_array(&array, index),
-                    "Expected {:?} to be equal to {:?} at index {}",
-                    scalar,
-                    array,
-                    index
-                );
-
-                // test that all other elements are *not* equal
-                for other_index in 0..array.len() {
-                    if index != other_index {
-                        assert!(
-                            !scalar.eq_array(&array, other_index),
-                            "Expected {:?} to be NOT equal to {:?} at index {}",
-                            scalar,
-                            array,
-                            other_index
-                        );
-                    }
-                }
-            }
+        // test that all other elements are *not* equal
+        for other_index in 0..array.len() {
+          if index != other_index {
+            assert!(
+              !scalar.eq_array(&array, other_index),
+              "Expected {:?} to be NOT equal to {:?} at index {}",
+              scalar,
+              array,
+              other_index
+            );
+          }
         }
+      }
     }
-
-    #[test]
-    fn scalar_partial_ordering() {
-        use ScalarValue::*;
-
-        assert_eq!(
-            Int64(Some(33)).partial_cmp(&Int64(Some(0))),
-            Some(Ordering::Greater)
-        );
-        assert_eq!(
-            Int64(Some(0)).partial_cmp(&Int64(Some(33))),
-            Some(Ordering::Less)
-        );
-        assert_eq!(
-            Int64(Some(33)).partial_cmp(&Int64(Some(33))),
-            Some(Ordering::Equal)
-        );
-        // For different data type, `partial_cmp` returns None.
-        assert_eq!(Int64(Some(33)).partial_cmp(&Int32(Some(33))), None);
-        assert_eq!(Int32(Some(33)).partial_cmp(&Int64(Some(33))), None);
-
-        assert_eq!(
-            List(
-                Some(Box::new(vec![Int32(Some(1)), Int32(Some(5))])),
-                Box::new(DataType::Int32),
-            )
-            .partial_cmp(&List(
-                Some(Box::new(vec![Int32(Some(1)), Int32(Some(5))])),
-                Box::new(DataType::Int32),
-            )),
-            Some(Ordering::Equal)
-        );
-
-        assert_eq!(
-            List(
-                Some(Box::new(vec![Int32(Some(10)), Int32(Some(5))])),
-                Box::new(DataType::Int32),
-            )
-            .partial_cmp(&List(
-                Some(Box::new(vec![Int32(Some(1)), Int32(Some(5))])),
-                Box::new(DataType::Int32),
-            )),
-            Some(Ordering::Greater)
-        );
-
-        assert_eq!(
-            List(
-                Some(Box::new(vec![Int32(Some(1)), Int32(Some(5))])),
-                Box::new(DataType::Int32),
-            )
-            .partial_cmp(&List(
-                Some(Box::new(vec![Int32(Some(10)), Int32(Some(5))])),
-                Box::new(DataType::Int32),
-            )),
-            Some(Ordering::Less)
-        );
-
-        // For different data type, `partial_cmp` returns None.
-        assert_eq!(
-            List(
-                Some(Box::new(vec![Int64(Some(1)), Int64(Some(5))])),
-                Box::new(DataType::Int64),
-            )
-            .partial_cmp(&List(
-                Some(Box::new(vec![Int32(Some(1)), Int32(Some(5))])),
-                Box::new(DataType::Int32),
-            )),
-            None
-        );
-
-        assert_eq!(
-            ScalarValue::from(vec![
-                ("A", ScalarValue::from(1.0)),
-                ("B", ScalarValue::from("Z")),
-            ])
-            .partial_cmp(&ScalarValue::from(vec![
-                ("A", ScalarValue::from(2.0)),
-                ("B", ScalarValue::from("A")),
-            ])),
-            Some(Ordering::Less)
-        );
-
-        // For different struct fields, `partial_cmp` returns None.
-        assert_eq!(
-            ScalarValue::from(vec![
-                ("A", ScalarValue::from(1.0)),
-                ("B", ScalarValue::from("Z")),
-            ])
-            .partial_cmp(&ScalarValue::from(vec![
-                ("a", ScalarValue::from(2.0)),
-                ("b", ScalarValue::from("A")),
-            ])),
-            None
-        );
-    }
-
-    #[test]
-    fn test_scalar_struct() {
-        let field_a = Field::new("A", DataType::Int32, false);
-        let field_b = Field::new("B", DataType::Boolean, false);
-        let field_c = Field::new("C", DataType::Utf8, false);
-
-        let field_e = Field::new("e", DataType::Int16, false);
-        let field_f = Field::new("f", DataType::Int64, false);
-        let field_d = Field::new(
-            "D",
-            DataType::Struct(vec![field_e.clone(), field_f.clone()]),
-            false,
-        );
-
-        let scalar = ScalarValue::Struct(
-            Some(Box::new(vec![
-                ScalarValue::Int32(Some(23)),
-                ScalarValue::Boolean(Some(false)),
-                ScalarValue::Utf8(Some("Hello".to_string())),
-                ScalarValue::from(vec![
-                    ("e", ScalarValue::from(2i16)),
-                    ("f", ScalarValue::from(3i64)),
-                ]),
-            ])),
-            Box::new(vec![
-                field_a.clone(),
-                field_b.clone(),
-                field_c.clone(),
-                field_d.clone(),
-            ]),
-        );
-
-        // Check Display
-        assert_eq!(
-            format!("{}", scalar),
-            String::from("{A:23,B:false,C:Hello,D:{e:2,f:3}}")
-        );
-
-        // Check Debug
-        assert_eq!(
-            format!("{:?}", scalar),
-            String::from(
-                r#"Struct({A:Int32(23),B:Boolean(false),C:Utf8("Hello"),D:Struct({e:Int16(2),f:Int64(3)})})"#
-            )
-        );
-
-        // Convert to length-2 array
-        let array = scalar.to_array_of_size(2);
-
-        let expected = Arc::new(StructArray::from(vec![
-            (
-                field_a.clone(),
-                Arc::new(Int32Array::from_slice(&[23, 23])) as ArrayRef,
-            ),
-            (
-                field_b.clone(),
-                Arc::new(BooleanArray::from_slice(&[false, false])) as ArrayRef,
-            ),
-            (
-                field_c.clone(),
-                Arc::new(StringArray::from_slice(&["Hello", "Hello"])) as ArrayRef,
-            ),
-            (
-                field_d.clone(),
-                Arc::new(StructArray::from(vec![
-                    (
-                        field_e.clone(),
-                        Arc::new(Int16Array::from_slice(&[2, 2])) as ArrayRef,
-                    ),
-                    (
-                        field_f.clone(),
-                        Arc::new(Int64Array::from_slice(&[3, 3])) as ArrayRef,
-                    ),
-                ])) as ArrayRef,
-            ),
-        ])) as ArrayRef;
-
-        assert_eq!(&array, &expected);
-
-        // Construct from second element of ArrayRef
-        let constructed = ScalarValue::try_from_array(&expected, 1).unwrap();
-        assert_eq!(constructed, scalar);
-
-        // None version
-        let none_scalar = ScalarValue::try_from(array.data_type()).unwrap();
-        assert!(none_scalar.is_null());
-        assert_eq!(format!("{:?}", none_scalar), String::from("Struct(NULL)"));
-
-        // Construct with convenience From<Vec<(&str, ScalarValue)>>
-        let constructed = ScalarValue::from(vec![
-            ("A", ScalarValue::from(23)),
-            ("B", ScalarValue::from(false)),
-            ("C", ScalarValue::from("Hello")),
-            (
-                "D",
-                ScalarValue::from(vec![
-                    ("e", ScalarValue::from(2i16)),
-                    ("f", ScalarValue::from(3i64)),
-                ]),
-            ),
-        ]);
-        assert_eq!(constructed, scalar);
-
-        // Build Array from Vec of structs
-        let scalars = vec![
-            ScalarValue::from(vec![
-                ("A", ScalarValue::from(23)),
-                ("B", ScalarValue::from(false)),
-                ("C", ScalarValue::from("Hello")),
-                (
-                    "D",
-                    ScalarValue::from(vec![
-                        ("e", ScalarValue::from(2i16)),
-                        ("f", ScalarValue::from(3i64)),
-                    ]),
-                ),
-            ]),
-            ScalarValue::from(vec![
-                ("A", ScalarValue::from(7)),
-                ("B", ScalarValue::from(true)),
-                ("C", ScalarValue::from("World")),
-                (
-                    "D",
-                    ScalarValue::from(vec![
-                        ("e", ScalarValue::from(4i16)),
-                        ("f", ScalarValue::from(5i64)),
-                    ]),
-                ),
-            ]),
-            ScalarValue::from(vec![
-                ("A", ScalarValue::from(-1000)),
-                ("B", ScalarValue::from(true)),
-                ("C", ScalarValue::from("!!!!!")),
-                (
-                    "D",
-                    ScalarValue::from(vec![
-                        ("e", ScalarValue::from(6i16)),
-                        ("f", ScalarValue::from(7i64)),
-                    ]),
-                ),
-            ]),
-        ];
-        let array = ScalarValue::iter_to_array(scalars).unwrap();
-
-        let expected = Arc::new(StructArray::from(vec![
-            (
-                field_a,
-                Arc::new(Int32Array::from_slice(&[23, 7, -1000])) as ArrayRef,
-            ),
-            (
-                field_b,
-                Arc::new(BooleanArray::from_slice(&[false, true, true])) as ArrayRef,
-            ),
-            (
-                field_c,
-                Arc::new(StringArray::from_slice(&["Hello", "World", "!!!!!"]))
-                    as ArrayRef,
-            ),
-            (
-                field_d,
-                Arc::new(StructArray::from(vec![
-                    (
-                        field_e,
-                        Arc::new(Int16Array::from_slice(&[2, 4, 6])) as ArrayRef,
-                    ),
-                    (
-                        field_f,
-                        Arc::new(Int64Array::from_slice(&[3, 5, 7])) as ArrayRef,
-                    ),
-                ])) as ArrayRef,
-            ),
-        ])) as ArrayRef;
-
-        assert_eq!(&array, &expected);
-    }
-
-    #[test]
-    fn test_lists_in_struct() {
-        let field_a = Field::new("A", DataType::Utf8, false);
-        let field_primitive_list = Field::new(
-            "primitive_list",
-            DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
-            false,
-        );
-
-        // Define primitive list scalars
-        let l0 = ScalarValue::List(
-            Some(Box::new(vec![
-                ScalarValue::from(1i32),
-                ScalarValue::from(2i32),
-                ScalarValue::from(3i32),
-            ])),
-            Box::new(DataType::Int32),
-        );
-
-        let l1 = ScalarValue::List(
-            Some(Box::new(vec![
-                ScalarValue::from(4i32),
-                ScalarValue::from(5i32),
-            ])),
-            Box::new(DataType::Int32),
-        );
-
-        let l2 = ScalarValue::List(
-            Some(Box::new(vec![ScalarValue::from(6i32)])),
-            Box::new(DataType::Int32),
-        );
-
-        // Define struct scalars
-        let s0 = ScalarValue::from(vec![
-            ("A", ScalarValue::Utf8(Some(String::from("First")))),
-            ("primitive_list", l0),
-        ]);
-
-        let s1 = ScalarValue::from(vec![
-            ("A", ScalarValue::Utf8(Some(String::from("Second")))),
-            ("primitive_list", l1),
-        ]);
-
-        let s2 = ScalarValue::from(vec![
-            ("A", ScalarValue::Utf8(Some(String::from("Third")))),
-            ("primitive_list", l2),
-        ]);
-
-        // iter_to_array for struct scalars
-        let array =
-            ScalarValue::iter_to_array(vec![s0.clone(), s1.clone(), s2.clone()]).unwrap();
-        let array = array.as_any().downcast_ref::<StructArray>().unwrap();
-
-        let expected = StructArray::from(vec![
-            (
-                field_a.clone(),
-                Arc::new(StringArray::from_slice(&["First", "Second", "Third"]))
-                    as ArrayRef,
-            ),
-            (
-                field_primitive_list.clone(),
-                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
-                    Some(vec![Some(1), Some(2), Some(3)]),
-                    Some(vec![Some(4), Some(5)]),
-                    Some(vec![Some(6)]),
-                ])),
-            ),
-        ]);
-
-        assert_eq!(array, &expected);
-
-        // Define list-of-structs scalars
-        let nl0 = ScalarValue::List(
-            Some(Box::new(vec![s0.clone(), s1.clone()])),
-            Box::new(s0.get_datatype()),
-        );
-
-        let nl1 =
-            ScalarValue::List(Some(Box::new(vec![s2])), Box::new(s0.get_datatype()));
-
-        let nl2 =
-            ScalarValue::List(Some(Box::new(vec![s1])), Box::new(s0.get_datatype()));
-
-        // iter_to_array for list-of-struct
-        let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
-        let array = array.as_any().downcast_ref::<ListArray>().unwrap();
-
-        // Construct expected array with array builders
-        let field_a_builder = StringBuilder::new(4);
-        let primitive_value_builder = Int32Array::builder(8);
-        let field_primitive_list_builder = ListBuilder::new(primitive_value_builder);
-
-        let element_builder = StructBuilder::new(
-            vec![field_a, field_primitive_list],
-            vec![
-                Box::new(field_a_builder),
-                Box::new(field_primitive_list_builder),
-            ],
-        );
-        let mut list_builder = ListBuilder::new(element_builder);
-
-        list_builder
-            .values()
-            .field_builder::<StringBuilder>(0)
-            .unwrap()
-            .append_value("First")
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .values()
-            .append_value(1)
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .values()
-            .append_value(2)
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .values()
-            .append_value(3)
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .append(true)
-            .unwrap();
-        list_builder.values().append(true).unwrap();
-
-        list_builder
-            .values()
-            .field_builder::<StringBuilder>(0)
-            .unwrap()
-            .append_value("Second")
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .values()
-            .append_value(4)
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .values()
-            .append_value(5)
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .append(true)
-            .unwrap();
-        list_builder.values().append(true).unwrap();
-        list_builder.append(true).unwrap();
-
-        list_builder
-            .values()
-            .field_builder::<StringBuilder>(0)
-            .unwrap()
-            .append_value("Third")
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .values()
-            .append_value(6)
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .append(true)
-            .unwrap();
-        list_builder.values().append(true).unwrap();
-        list_builder.append(true).unwrap();
-
-        list_builder
-            .values()
-            .field_builder::<StringBuilder>(0)
-            .unwrap()
-            .append_value("Second")
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .values()
-            .append_value(4)
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .values()
-            .append_value(5)
-            .unwrap();
-        list_builder
-            .values()
-            .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
-            .unwrap()
-            .append(true)
-            .unwrap();
-        list_builder.values().append(true).unwrap();
-        list_builder.append(true).unwrap();
-
-        let expected = list_builder.finish();
-
-        assert_eq!(array, &expected);
-    }
-
-    #[test]
-    fn test_nested_lists() {
-        // Define inner list scalars
-        let l1 = ScalarValue::List(
-            Some(Box::new(vec![
-                ScalarValue::List(
-                    Some(Box::new(vec![
-                        ScalarValue::from(1i32),
-                        ScalarValue::from(2i32),
-                        ScalarValue::from(3i32),
-                    ])),
-                    Box::new(DataType::Int32),
-                ),
-                ScalarValue::List(
-                    Some(Box::new(vec![
-                        ScalarValue::from(4i32),
-                        ScalarValue::from(5i32),
-                    ])),
-                    Box::new(DataType::Int32),
-                ),
-            ])),
-            Box::new(DataType::List(Box::new(Field::new(
-                "item",
-                DataType::Int32,
-                true,
-            )))),
-        );
-
-        let l2 = ScalarValue::List(
-            Some(Box::new(vec![
-                ScalarValue::List(
-                    Some(Box::new(vec![ScalarValue::from(6i32)])),
-                    Box::new(DataType::Int32),
-                ),
-                ScalarValue::List(
-                    Some(Box::new(vec![
-                        ScalarValue::from(7i32),
-                        ScalarValue::from(8i32),
-                    ])),
-                    Box::new(DataType::Int32),
-                ),
-            ])),
-            Box::new(DataType::List(Box::new(Field::new(
-                "item",
-                DataType::Int32,
-                true,
-            )))),
-        );
-
-        let l3 = ScalarValue::List(
-            Some(Box::new(vec![ScalarValue::List(
-                Some(Box::new(vec![ScalarValue::from(9i32)])),
-                Box::new(DataType::Int32),
-            )])),
-            Box::new(DataType::List(Box::new(Field::new(
-                "item",
-                DataType::Int32,
-                true,
-            )))),
-        );
-
-        let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap();
-        let array = array.as_any().downcast_ref::<ListArray>().unwrap();
-
-        // Construct expected array with array builders
-        let inner_builder = Int32Array::builder(8);
-        let middle_builder = ListBuilder::new(inner_builder);
-        let mut outer_builder = ListBuilder::new(middle_builder);
-
-        outer_builder.values().values().append_value(1).unwrap();
-        outer_builder.values().values().append_value(2).unwrap();
-        outer_builder.values().values().append_value(3).unwrap();
-        outer_builder.values().append(true).unwrap();
-
-        outer_builder.values().values().append_value(4).unwrap();
-        outer_builder.values().values().append_value(5).unwrap();
-        outer_builder.values().append(true).unwrap();
-        outer_builder.append(true).unwrap();
-
-        outer_builder.values().values().append_value(6).unwrap();
-        outer_builder.values().append(true).unwrap();
-
-        outer_builder.values().values().append_value(7).unwrap();
-        outer_builder.values().values().append_value(8).unwrap();
-        outer_builder.values().append(true).unwrap();
-        outer_builder.append(true).unwrap();
-
-        outer_builder.values().values().append_value(9).unwrap();
-        outer_builder.values().append(true).unwrap();
-        outer_builder.append(true).unwrap();
-
-        let expected = outer_builder.finish();
-
-        assert_eq!(array, &expected);
-    }
-
-    #[test]
-    fn scalar_timestamp_ns_utc_timezone() {
-        let scalar = ScalarValue::TimestampNanosecond(
-            Some(1599566400000000000),
-            Some("UTC".to_owned()),
-        );
-
-        assert_eq!(
-            scalar.get_datatype(),
-            DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
-        );
-
-        let array = scalar.to_array();
-        assert_eq!(array.len(), 1);
-        assert_eq!(
-            array.data_type(),
-            &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
-        );
-
-        let newscalar = ScalarValue::try_from_array(&array, 0).unwrap();
-        assert_eq!(
-            newscalar.get_datatype(),
-            DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
-        );
-    }
+  }
+
+  #[test]
+  fn scalar_partial_ordering() {
+    use ScalarValue::*;
+
+    assert_eq!(
+      Int64(Some(33)).partial_cmp(&Int64(Some(0))),
+      Some(Ordering::Greater)
+    );
+    assert_eq!(
+      Int64(Some(0)).partial_cmp(&Int64(Some(33))),
+      Some(Ordering::Less)
+    );
+    assert_eq!(
+      Int64(Some(33)).partial_cmp(&Int64(Some(33))),
+      Some(Ordering::Equal)
+    );
+    // For different data type, `partial_cmp` returns None.
+    assert_eq!(Int64(Some(33)).partial_cmp(&Int32(Some(33))), None);
+    assert_eq!(Int32(Some(33)).partial_cmp(&Int64(Some(33))), None);
+
+    assert_eq!(
+      List(
+        Some(Box::new(vec![Int32(Some(1)), Int32(Some(5))])),
+        Box::new(DataType::Int32),
+      )
+      .partial_cmp(&List(
+        Some(Box::new(vec![Int32(Some(1)), Int32(Some(5))])),
+        Box::new(DataType::Int32),
+      )),
+      Some(Ordering::Equal)
+    );
+
+    assert_eq!(
+      List(
+        Some(Box::new(vec![Int32(Some(10)), Int32(Some(5))])),
+        Box::new(DataType::Int32),
+      )
+      .partial_cmp(&List(
+        Some(Box::new(vec![Int32(Some(1)), Int32(Some(5))])),
+        Box::new(DataType::Int32),
+      )),
+      Some(Ordering::Greater)
+    );
+
+    assert_eq!(
+      List(
+        Some(Box::new(vec![Int32(Some(1)), Int32(Some(5))])),
+        Box::new(DataType::Int32),
+      )
+      .partial_cmp(&List(
+        Some(Box::new(vec![Int32(Some(10)), Int32(Some(5))])),
+        Box::new(DataType::Int32),
+      )),
+      Some(Ordering::Less)
+    );
+
+    // For different data type, `partial_cmp` returns None.
+    assert_eq!(
+      List(
+        Some(Box::new(vec![Int64(Some(1)), Int64(Some(5))])),
+        Box::new(DataType::Int64),
+      )
+      .partial_cmp(&List(
+        Some(Box::new(vec![Int32(Some(1)), Int32(Some(5))])),
+        Box::new(DataType::Int32),
+      )),
+      None
+    );
+
+    assert_eq!(
+      ScalarValue::from(vec![
+        ("A", ScalarValue::from(1.0)),
+        ("B", ScalarValue::from("Z")),
+      ])
+      .partial_cmp(&ScalarValue::from(vec![
+        ("A", ScalarValue::from(2.0)),
+        ("B", ScalarValue::from("A")),
+      ])),
+      Some(Ordering::Less)
+    );
+
+    // For different struct fields, `partial_cmp` returns None.
+    assert_eq!(
+      ScalarValue::from(vec![
+        ("A", ScalarValue::from(1.0)),
+        ("B", ScalarValue::from("Z")),
+      ])
+      .partial_cmp(&ScalarValue::from(vec![
+        ("a", ScalarValue::from(2.0)),
+        ("b", ScalarValue::from("A")),
+      ])),
+      None
+    );
+  }
+
+  #[test]
+  fn test_scalar_struct() {
+    let field_a = Field::new("A", DataType::Int32, false);
+    let field_b = Field::new("B", DataType::Boolean, false);
+    let field_c = Field::new("C", DataType::Utf8, false);
+
+    let field_e = Field::new("e", DataType::Int16, false);
+    let field_f = Field::new("f", DataType::Int64, false);
+    let field_d = Field::new(
+      "D",
+      DataType::Struct(vec![field_e.clone(), field_f.clone()]),
+      false,
+    );
+
+    let scalar = ScalarValue::Struct(
+      Some(Box::new(vec![
+        ScalarValue::Int32(Some(23)),
+        ScalarValue::Boolean(Some(false)),
+        ScalarValue::Utf8(Some("Hello".to_string())),
+        ScalarValue::from(vec![
+          ("e", ScalarValue::from(2i16)),
+          ("f", ScalarValue::from(3i64)),
+        ]),
+      ])),
+      Box::new(vec![
+        field_a.clone(),
+        field_b.clone(),
+        field_c.clone(),
+        field_d.clone(),
+      ]),
+    );
+
+    // Check Display
+    assert_eq!(
+      format!("{}", scalar),
+      String::from("{A:23,B:false,C:Hello,D:{e:2,f:3}}")
+    );
+
+    // Check Debug
+    assert_eq!(
+      format!("{:?}", scalar),
+      String::from(
+        r#"Struct({A:Int32(23),B:Boolean(false),C:Utf8("Hello"),D:Struct({e:Int16(2),f:Int64(3)})})"#
+      )
+    );
+
+    // Convert to length-2 array
+    let array = scalar.to_array_of_size(2);
+
+    let expected = Arc::new(StructArray::from(vec![
+      (
+        field_a.clone(),
+        Arc::new(Int32Array::from_slice(&[23, 23])) as ArrayRef,
+      ),
+      (
+        field_b.clone(),
+        Arc::new(BooleanArray::from_slice(&[false, false])) as ArrayRef,
+      ),
+      (
+        field_c.clone(),
+        Arc::new(StringArray::from_slice(&["Hello", "Hello"])) as ArrayRef,
+      ),
+      (
+        field_d.clone(),
+        Arc::new(StructArray::from(vec![
+          (
+            field_e.clone(),
+            Arc::new(Int16Array::from_slice(&[2, 2])) as ArrayRef,
+          ),
+          (
+            field_f.clone(),
+            Arc::new(Int64Array::from_slice(&[3, 3])) as ArrayRef,
+          ),
+        ])) as ArrayRef,
+      ),
+    ])) as ArrayRef;
+
+    assert_eq!(&array, &expected);
+
+    // Construct from second element of ArrayRef
+    let constructed = ScalarValue::try_from_array(&expected, 1).unwrap();
+    assert_eq!(constructed, scalar);
+
+    // None version
+    let none_scalar = ScalarValue::try_from(array.data_type()).unwrap();
+    assert!(none_scalar.is_null());
+    assert_eq!(format!("{:?}", none_scalar), String::from("Struct(NULL)"));
+
+    // Construct with convenience From<Vec<(&str, ScalarValue)>>
+    let constructed = ScalarValue::from(vec![
+      ("A", ScalarValue::from(23)),
+      ("B", ScalarValue::from(false)),
+      ("C", ScalarValue::from("Hello")),
+      (
+        "D",
+        ScalarValue::from(vec![
+          ("e", ScalarValue::from(2i16)),
+          ("f", ScalarValue::from(3i64)),
+        ]),
+      ),
+    ]);
+    assert_eq!(constructed, scalar);
+
+    // Build Array from Vec of structs
+    let scalars = vec![
+      ScalarValue::from(vec![
+        ("A", ScalarValue::from(23)),
+        ("B", ScalarValue::from(false)),
+        ("C", ScalarValue::from("Hello")),
+        (
+          "D",
+          ScalarValue::from(vec![
+            ("e", ScalarValue::from(2i16)),
+            ("f", ScalarValue::from(3i64)),
+          ]),
+        ),
+      ]),
+      ScalarValue::from(vec![
+        ("A", ScalarValue::from(7)),
+        ("B", ScalarValue::from(true)),
+        ("C", ScalarValue::from("World")),
+        (
+          "D",
+          ScalarValue::from(vec![
+            ("e", ScalarValue::from(4i16)),
+            ("f", ScalarValue::from(5i64)),
+          ]),
+        ),
+      ]),
+      ScalarValue::from(vec![
+        ("A", ScalarValue::from(-1000)),
+        ("B", ScalarValue::from(true)),
+        ("C", ScalarValue::from("!!!!!")),
+        (
+          "D",
+          ScalarValue::from(vec![
+            ("e", ScalarValue::from(6i16)),
+            ("f", ScalarValue::from(7i64)),
+          ]),
+        ),
+      ]),
+    ];
+    let array = ScalarValue::iter_to_array(scalars).unwrap();
+
+    let expected = Arc::new(StructArray::from(vec![
+      (
+        field_a,
+        Arc::new(Int32Array::from_slice(&[23, 7, -1000])) as ArrayRef,
+      ),
+      (
+        field_b,
+        Arc::new(BooleanArray::from_slice(&[false, true, true])) as ArrayRef,
+      ),
+      (
+        field_c,
+        Arc::new(StringArray::from_slice(&["Hello", "World", "!!!!!"])) as ArrayRef,
+      ),
+      (
+        field_d,
+        Arc::new(StructArray::from(vec![
+          (
+            field_e,
+            Arc::new(Int16Array::from_slice(&[2, 4, 6])) as ArrayRef,
+          ),
+          (
+            field_f,
+            Arc::new(Int64Array::from_slice(&[3, 5, 7])) as ArrayRef,
+          ),
+        ])) as ArrayRef,
+      ),
+    ])) as ArrayRef;
+
+    assert_eq!(&array, &expected);
+  }
+
+  #[test]
+  fn test_lists_in_struct() {
+    let field_a = Field::new("A", DataType::Utf8, false);
+    let field_primitive_list = Field::new(
+      "primitive_list",
+      DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
+      false,
+    );
+
+    // Define primitive list scalars
+    let l0 = ScalarValue::List(
+      Some(Box::new(vec![
+        ScalarValue::from(1i32),
+        ScalarValue::from(2i32),
+        ScalarValue::from(3i32),
+      ])),
+      Box::new(DataType::Int32),
+    );
+
+    let l1 = ScalarValue::List(
+      Some(Box::new(vec![
+        ScalarValue::from(4i32),
+        ScalarValue::from(5i32),
+      ])),
+      Box::new(DataType::Int32),
+    );
+
+    let l2 = ScalarValue::List(
+      Some(Box::new(vec![ScalarValue::from(6i32)])),
+      Box::new(DataType::Int32),
+    );
+
+    // Define struct scalars
+    let s0 = ScalarValue::from(vec![
+      ("A", ScalarValue::Utf8(Some(String::from("First")))),
+      ("primitive_list", l0),
+    ]);
+
+    let s1 = ScalarValue::from(vec![
+      ("A", ScalarValue::Utf8(Some(String::from("Second")))),
+      ("primitive_list", l1),
+    ]);
+
+    let s2 = ScalarValue::from(vec![
+      ("A", ScalarValue::Utf8(Some(String::from("Third")))),
+      ("primitive_list", l2),
+    ]);
+
+    // iter_to_array for struct scalars
+    let array =
+      ScalarValue::iter_to_array(vec![s0.clone(), s1.clone(), s2.clone()]).unwrap();
+    let array = array.as_any().downcast_ref::<StructArray>().unwrap();
+
+    let expected = StructArray::from(vec![
+      (
+        field_a.clone(),
+        Arc::new(StringArray::from_slice(&["First", "Second", "Third"])) as ArrayRef,
+      ),
+      (
+        field_primitive_list.clone(),
+        Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
+          Some(vec![Some(1), Some(2), Some(3)]),
+          Some(vec![Some(4), Some(5)]),
+          Some(vec![Some(6)]),
+        ])),
+      ),
+    ]);
+
+    assert_eq!(array, &expected);
+
+    // Define list-of-structs scalars
+    let nl0 = ScalarValue::List(
+      Some(Box::new(vec![s0.clone(), s1.clone()])),
+      Box::new(s0.get_datatype()),
+    );
+
+    let nl1 = ScalarValue::List(Some(Box::new(vec![s2])), Box::new(s0.get_datatype()));
+
+    let nl2 = ScalarValue::List(Some(Box::new(vec![s1])), Box::new(s0.get_datatype()));
+
+    // iter_to_array for list-of-struct
+    let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
+    let array = array.as_any().downcast_ref::<ListArray>().unwrap();
+
+    // Construct expected array with array builders
+    let field_a_builder = StringBuilder::new(4);
+    let primitive_value_builder = Int32Array::builder(8);
+    let field_primitive_list_builder = ListBuilder::new(primitive_value_builder);
+
+    let element_builder = StructBuilder::new(
+      vec![field_a, field_primitive_list],
+      vec![
+        Box::new(field_a_builder),
+        Box::new(field_primitive_list_builder),
+      ],
+    );
+    let mut list_builder = ListBuilder::new(element_builder);
+
+    list_builder
+      .values()
+      .field_builder::<StringBuilder>(0)
+      .unwrap()
+      .append_value("First")
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .values()
+      .append_value(1)
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .values()
+      .append_value(2)
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .values()
+      .append_value(3)
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .append(true)
+      .unwrap();
+    list_builder.values().append(true).unwrap();
+
+    list_builder
+      .values()
+      .field_builder::<StringBuilder>(0)
+      .unwrap()
+      .append_value("Second")
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .values()
+      .append_value(4)
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .values()
+      .append_value(5)
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .append(true)
+      .unwrap();
+    list_builder.values().append(true).unwrap();
+    list_builder.append(true).unwrap();
+
+    list_builder
+      .values()
+      .field_builder::<StringBuilder>(0)
+      .unwrap()
+      .append_value("Third")
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .values()
+      .append_value(6)
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .append(true)
+      .unwrap();
+    list_builder.values().append(true).unwrap();
+    list_builder.append(true).unwrap();
+
+    list_builder
+      .values()
+      .field_builder::<StringBuilder>(0)
+      .unwrap()
+      .append_value("Second")
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .values()
+      .append_value(4)
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .values()
+      .append_value(5)
+      .unwrap();
+    list_builder
+      .values()
+      .field_builder::<ListBuilder<PrimitiveBuilder<Int32Type>>>(1)
+      .unwrap()
+      .append(true)
+      .unwrap();
+    list_builder.values().append(true).unwrap();
+    list_builder.append(true).unwrap();
+
+    let expected = list_builder.finish();
+
+    assert_eq!(array, &expected);
+  }
+
+  #[test]
+  fn test_nested_lists() {
+    // Define inner list scalars
+    let l1 = ScalarValue::List(
+      Some(Box::new(vec![
+        ScalarValue::List(
+          Some(Box::new(vec![
+            ScalarValue::from(1i32),
+            ScalarValue::from(2i32),
+            ScalarValue::from(3i32),
+          ])),
+          Box::new(DataType::Int32),
+        ),
+        ScalarValue::List(
+          Some(Box::new(vec![
+            ScalarValue::from(4i32),
+            ScalarValue::from(5i32),
+          ])),
+          Box::new(DataType::Int32),
+        ),
+      ])),
+      Box::new(DataType::List(Box::new(Field::new(
+        "item",
+        DataType::Int32,
+        true,
+      )))),
+    );
+
+    let l2 = ScalarValue::List(
+      Some(Box::new(vec![
+        ScalarValue::List(
+          Some(Box::new(vec![ScalarValue::from(6i32)])),
+          Box::new(DataType::Int32),
+        ),
+        ScalarValue::List(
+          Some(Box::new(vec![
+            ScalarValue::from(7i32),
+            ScalarValue::from(8i32),
+          ])),
+          Box::new(DataType::Int32),
+        ),
+      ])),
+      Box::new(DataType::List(Box::new(Field::new(
+        "item",
+        DataType::Int32,
+        true,
+      )))),
+    );
+
+    let l3 = ScalarValue::List(
+      Some(Box::new(vec![ScalarValue::List(
+        Some(Box::new(vec![ScalarValue::from(9i32)])),
+        Box::new(DataType::Int32),
+      )])),
+      Box::new(DataType::List(Box::new(Field::new(
+        "item",
+        DataType::Int32,
+        true,
+      )))),
+    );
+
+    let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap();
+    let array = array.as_any().downcast_ref::<ListArray>().unwrap();
+
+    // Construct expected array with array builders
+    let inner_builder = Int32Array::builder(8);
+    let middle_builder = ListBuilder::new(inner_builder);
+    let mut outer_builder = ListBuilder::new(middle_builder);
+
+    outer_builder.values().values().append_value(1).unwrap();
+    outer_builder.values().values().append_value(2).unwrap();
+    outer_builder.values().values().append_value(3).unwrap();
+    outer_builder.values().append(true).unwrap();
+
+    outer_builder.values().values().append_value(4).unwrap();
+    outer_builder.values().values().append_value(5).unwrap();
+    outer_builder.values().append(true).unwrap();
+    outer_builder.append(true).unwrap();
+
+    outer_builder.values().values().append_value(6).unwrap();
+    outer_builder.values().append(true).unwrap();
+
+    outer_builder.values().values().append_value(7).unwrap();
+    outer_builder.values().values().append_value(8).unwrap();
+    outer_builder.values().append(true).unwrap();
+    outer_builder.append(true).unwrap();
+
+    outer_builder.values().values().append_value(9).unwrap();
+    outer_builder.values().append(true).unwrap();
+    outer_builder.append(true).unwrap();
+
+    let expected = outer_builder.finish();
+
+    assert_eq!(array, &expected);
+  }
+
+  #[test]
+  fn scalar_timestamp_ns_utc_timezone() {
+    let scalar =
+      ScalarValue::TimestampNanosecond(Some(1599566400000000000), Some("UTC".to_owned()));
+
+    assert_eq!(
+      scalar.get_datatype(),
+      DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
+    );
+
+    let array = scalar.to_array();
+    assert_eq!(array.len(), 1);
+    assert_eq!(
+      array.data_type(),
+      &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
+    );
+
+    let newscalar = ScalarValue::try_from_array(&array, 0).unwrap();
+    assert_eq!(
+      newscalar.get_datatype(),
+      DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
+    );
+  }
 }