You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/06 05:23:58 UTC

[arrow-datafusion] branch datafusion-common-column updated: move column, dfschema, etc. to common module

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch datafusion-common-column
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/datafusion-common-column by this push:
     new 3bf434a  move column, dfschema, etc. to common module
3bf434a is described below

commit 3bf434abb5b268751de7a09b4702739e7fc11f37
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:23:43 2022 +0800

    move column, dfschema, etc. to common module
---
 datafusion-common/src/column.rs         |   2 +-
 datafusion-common/src/dfschema.rs       |  34 ++
 datafusion-common/src/lib.rs            |   2 +-
 datafusion/src/logical_plan/builder.rs  |  48 ++-
 datafusion/src/logical_plan/dfschema.rs | 669 +-------------------------------
 datafusion/src/logical_plan/expr.rs     | 174 +--------
 6 files changed, 72 insertions(+), 857 deletions(-)

diff --git a/datafusion-common/src/column.rs b/datafusion-common/src/column.rs
index 5d68916..02faa24 100644
--- a/datafusion-common/src/column.rs
+++ b/datafusion-common/src/column.rs
@@ -75,7 +75,7 @@ impl Column {
     }
 
     // Internal implementation of normalize
-    fn normalize_with_schemas(
+    pub fn normalize_with_schemas(
         self,
         schemas: &[&Arc<DFSchema>],
         using_columns: &[HashSet<Column>],
diff --git a/datafusion-common/src/dfschema.rs b/datafusion-common/src/dfschema.rs
index 453932a..46321c3 100644
--- a/datafusion-common/src/dfschema.rs
+++ b/datafusion-common/src/dfschema.rs
@@ -402,6 +402,40 @@ impl Display for DFSchema {
     }
 }
 
+/// Provides schema information needed by [Expr] methods such as
+/// [Expr::nullable] and [Expr::data_type].
+///
+/// Note that this trait is implemented for &[DFSchema] which is
+/// widely used in the DataFusion codebase.
+pub trait ExprSchema {
+    /// Is this column reference nullable?
+    fn nullable(&self, col: &Column) -> Result<bool>;
+
+    /// What is the datatype of this column?
+    fn data_type(&self, col: &Column) -> Result<&DataType>;
+}
+
+// Implement `ExprSchema` for `Arc<DFSchema>`
+impl<P: AsRef<DFSchema>> ExprSchema for P {
+    fn nullable(&self, col: &Column) -> Result<bool> {
+        self.as_ref().nullable(col)
+    }
+
+    fn data_type(&self, col: &Column) -> Result<&DataType> {
+        self.as_ref().data_type(col)
+    }
+}
+
+impl ExprSchema for DFSchema {
+    fn nullable(&self, col: &Column) -> Result<bool> {
+        Ok(self.field_from_column(col)?.is_nullable())
+    }
+
+    fn data_type(&self, col: &Column) -> Result<&DataType> {
+        Ok(self.field_from_column(col)?.data_type())
+    }
+}
+
 /// DFField wraps an Arrow field and adds an optional qualifier
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct DFField {
diff --git a/datafusion-common/src/lib.rs b/datafusion-common/src/lib.rs
index 8090fc0..11f9bbb 100644
--- a/datafusion-common/src/lib.rs
+++ b/datafusion-common/src/lib.rs
@@ -20,5 +20,5 @@ mod dfschema;
 mod error;
 
 pub use column::Column;
-pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
+pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
 pub use error::{DataFusionError, Result};
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 613c8e9..d81fa9d 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -595,6 +595,17 @@ impl LogicalPlanBuilder {
         self.join_detailed(right, join_type, join_keys, false)
     }
 
+    fn normalize(
+        plan: &LogicalPlan,
+        column: impl Into<Column> + Clone,
+    ) -> Result<Column> {
+        let schemas = plan.all_schemas();
+        let using_columns = plan.using_columns()?;
+        column
+            .into()
+            .normalize_with_schemas(&schemas, &using_columns)
+    }
+
     /// Apply a join with on constraint and specified null equality
     /// If null_equals_null is true then null == null, else null != null
     pub fn join_detailed(
@@ -633,7 +644,10 @@ impl LogicalPlanBuilder {
                             match (l_is_left, l_is_right, r_is_left, r_is_right) {
                                 (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
                                 (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
-                                _ => (l.normalize(&self.plan), r.normalize(right)),
+                                _ => (
+                                    Self::normalize(&self.plan, l),
+                                    Self::normalize(right, r),
+                                ),
                             }
                         }
                         (Some(lr), None) => {
@@ -643,9 +657,12 @@ impl LogicalPlanBuilder {
                                 right.schema().field_with_qualified_name(lr, &l.name);
 
                             match (l_is_left, l_is_right) {
-                                (Ok(_), _) => (Ok(l), r.normalize(right)),
-                                (_, Ok(_)) => (r.normalize(&self.plan), Ok(l)),
-                                _ => (l.normalize(&self.plan), r.normalize(right)),
+                                (Ok(_), _) => (Ok(l), Self::normalize(right, r)),
+                                (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
+                                _ => (
+                                    Self::normalize(&self.plan, l),
+                                    Self::normalize(right, r),
+                                ),
                             }
                         }
                         (None, Some(rr)) => {
@@ -655,22 +672,25 @@ impl LogicalPlanBuilder {
                                 right.schema().field_with_qualified_name(rr, &r.name);
 
                             match (r_is_left, r_is_right) {
-                                (Ok(_), _) => (Ok(r), l.normalize(right)),
-                                (_, Ok(_)) => (l.normalize(&self.plan), Ok(r)),
-                                _ => (l.normalize(&self.plan), r.normalize(right)),
+                                (Ok(_), _) => (Ok(r), Self::normalize(right, l)),
+                                (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
+                                _ => (
+                                    Self::normalize(&self.plan, l),
+                                    Self::normalize(right, r),
+                                ),
                             }
                         }
                         (None, None) => {
                             let mut swap = false;
-                            let left_key =
-                                l.clone().normalize(&self.plan).or_else(|_| {
+                            let left_key = Self::normalize(&self.plan, l.clone())
+                                .or_else(|_| {
                                     swap = true;
-                                    l.normalize(right)
+                                    Self::normalize(right, l)
                                 });
                             if swap {
-                                (r.normalize(&self.plan), left_key)
+                                (Self::normalize(&self.plan, r), left_key)
                             } else {
-                                (left_key, r.normalize(right))
+                                (left_key, Self::normalize(right, r))
                             }
                         }
                     }
@@ -705,11 +725,11 @@ impl LogicalPlanBuilder {
         let left_keys: Vec<Column> = using_keys
             .clone()
             .into_iter()
-            .map(|c| c.into().normalize(&self.plan))
+            .map(|c| Self::normalize(&self.plan, c))
             .collect::<Result<_>>()?;
         let right_keys: Vec<Column> = using_keys
             .into_iter()
-            .map(|c| c.into().normalize(right))
+            .map(|c| Self::normalize(right, c))
             .collect::<Result<_>>()?;
 
         let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys.into_iter()).collect();
diff --git a/datafusion/src/logical_plan/dfschema.rs b/datafusion/src/logical_plan/dfschema.rs
index 7b6471f..eb62428 100644
--- a/datafusion/src/logical_plan/dfschema.rs
+++ b/datafusion/src/logical_plan/dfschema.rs
@@ -18,671 +18,4 @@
 //! DFSchema is an extended schema struct that DataFusion uses to provide support for
 //! fields with optional relation names.
 
-use std::collections::HashSet;
-use std::convert::TryFrom;
-use std::sync::Arc;
-
-use crate::error::{DataFusionError, Result};
-use crate::logical_plan::Column;
-
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use std::fmt::{Display, Formatter};
-
-/// A reference-counted reference to a `DFSchema`.
-pub type DFSchemaRef = Arc<DFSchema>;
-
-/// DFSchema wraps an Arrow schema and adds relation names
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct DFSchema {
-    /// Fields
-    fields: Vec<DFField>,
-}
-
-impl DFSchema {
-    /// Creates an empty `DFSchema`
-    pub fn empty() -> Self {
-        Self { fields: vec![] }
-    }
-
-    /// Create a new `DFSchema`
-    pub fn new(fields: Vec<DFField>) -> Result<Self> {
-        let mut qualified_names = HashSet::new();
-        let mut unqualified_names = HashSet::new();
-
-        for field in &fields {
-            if let Some(qualifier) = field.qualifier() {
-                if !qualified_names.insert((qualifier, field.name())) {
-                    return Err(DataFusionError::Plan(format!(
-                        "Schema contains duplicate qualified field name '{}'",
-                        field.qualified_name()
-                    )));
-                }
-            } else if !unqualified_names.insert(field.name()) {
-                return Err(DataFusionError::Plan(format!(
-                    "Schema contains duplicate unqualified field name '{}'",
-                    field.name()
-                )));
-            }
-        }
-
-        // check for mix of qualified and unqualified field with same unqualified name
-        // note that we need to sort the contents of the HashSet first so that errors are
-        // deterministic
-        let mut qualified_names = qualified_names
-            .iter()
-            .map(|(l, r)| (l.to_owned(), r.to_owned()))
-            .collect::<Vec<(&String, &String)>>();
-        qualified_names.sort_by(|a, b| {
-            let a = format!("{}.{}", a.0, a.1);
-            let b = format!("{}.{}", b.0, b.1);
-            a.cmp(&b)
-        });
-        for (qualifier, name) in &qualified_names {
-            if unqualified_names.contains(name) {
-                return Err(DataFusionError::Plan(format!(
-                    "Schema contains qualified field name '{}.{}' \
-                    and unqualified field name '{}' which would be ambiguous",
-                    qualifier, name, name
-                )));
-            }
-        }
-        Ok(Self { fields })
-    }
-
-    /// Create a `DFSchema` from an Arrow schema
-    pub fn try_from_qualified_schema(qualifier: &str, schema: &Schema) -> Result<Self> {
-        Self::new(
-            schema
-                .fields()
-                .iter()
-                .map(|f| DFField::from_qualified(qualifier, f.clone()))
-                .collect(),
-        )
-    }
-
-    /// Combine two schemas
-    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
-        let mut fields = self.fields.clone();
-        fields.extend_from_slice(schema.fields().as_slice());
-        Self::new(fields)
-    }
-
-    /// Merge a schema into self
-    pub fn merge(&mut self, other_schema: &DFSchema) {
-        for field in other_schema.fields() {
-            // skip duplicate columns
-            let duplicated_field = match field.qualifier() {
-                Some(q) => self.field_with_name(Some(q.as_str()), field.name()).is_ok(),
-                // for unqualifed columns, check as unqualified name
-                None => self.field_with_unqualified_name(field.name()).is_ok(),
-            };
-            if !duplicated_field {
-                self.fields.push(field.clone());
-            }
-        }
-    }
-
-    /// Get a list of fields
-    pub fn fields(&self) -> &Vec<DFField> {
-        &self.fields
-    }
-
-    /// Returns an immutable reference of a specific `Field` instance selected using an
-    /// offset within the internal `fields` vector
-    pub fn field(&self, i: usize) -> &DFField {
-        &self.fields[i]
-    }
-
-    /// Find the index of the column with the given unqualified name
-    pub fn index_of(&self, name: &str) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            if self.fields[i].name() == name {
-                return Ok(i);
-            }
-        }
-        Err(DataFusionError::Plan(format!(
-            "No field named '{}'. Valid fields are {}.",
-            name,
-            self.get_field_names()
-        )))
-    }
-
-    fn index_of_column_by_name(
-        &self,
-        qualifier: Option<&str>,
-        name: &str,
-    ) -> Result<usize> {
-        let mut matches = self
-            .fields
-            .iter()
-            .enumerate()
-            .filter(|(_, field)| match (qualifier, &field.qualifier) {
-                // field to lookup is qualified.
-                // current field is qualified and not shared between relations, compare both
-                // qualifier and name.
-                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
-                // field to lookup is qualified but current field is unqualified.
-                (Some(_), None) => false,
-                // field to lookup is unqualified, no need to compare qualifier
-                (None, Some(_)) | (None, None) => field.name() == name,
-            })
-            .map(|(idx, _)| idx);
-        match matches.next() {
-            None => Err(DataFusionError::Plan(format!(
-                "No field named '{}.{}'. Valid fields are {}.",
-                qualifier.unwrap_or("<unqualified>"),
-                name,
-                self.get_field_names()
-            ))),
-            Some(idx) => match matches.next() {
-                None => Ok(idx),
-                // found more than one matches
-                Some(_) => Err(DataFusionError::Internal(format!(
-                    "Ambiguous reference to qualified field named '{}.{}'",
-                    qualifier.unwrap_or("<unqualified>"),
-                    name
-                ))),
-            },
-        }
-    }
-
-    /// Find the index of the column with the given qualifier and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        self.index_of_column_by_name(col.relation.as_deref(), &col.name)
-    }
-
-    /// Find the field with the given name
-    pub fn field_with_name(
-        &self,
-        qualifier: Option<&str>,
-        name: &str,
-    ) -> Result<&DFField> {
-        if let Some(qualifier) = qualifier {
-            self.field_with_qualified_name(qualifier, name)
-        } else {
-            self.field_with_unqualified_name(name)
-        }
-    }
-
-    /// Find all fields match the given name
-    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> {
-        self.fields
-            .iter()
-            .filter(|field| field.name() == name)
-            .collect()
-    }
-
-    /// Find the field with the given name
-    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&DFField> {
-        let matches = self.fields_with_unqualified_name(name);
-        match matches.len() {
-            0 => Err(DataFusionError::Plan(format!(
-                "No field with unqualified name '{}'. Valid fields are {}.",
-                name,
-                self.get_field_names()
-            ))),
-            1 => Ok(matches[0]),
-            _ => Err(DataFusionError::Plan(format!(
-                "Ambiguous reference to field named '{}'",
-                name
-            ))),
-        }
-    }
-
-    /// Find the field with the given qualified name
-    pub fn field_with_qualified_name(
-        &self,
-        qualifier: &str,
-        name: &str,
-    ) -> Result<&DFField> {
-        let idx = self.index_of_column_by_name(Some(qualifier), name)?;
-        Ok(self.field(idx))
-    }
-
-    /// Find the field with the given qualified column
-    pub fn field_from_column(&self, column: &Column) -> Result<&DFField> {
-        match &column.relation {
-            Some(r) => self.field_with_qualified_name(r, &column.name),
-            None => self.field_with_unqualified_name(&column.name),
-        }
-    }
-
-    /// Check to see if unqualified field names matches field names in Arrow schema
-    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
-        self.fields
-            .iter()
-            .zip(arrow_schema.fields().iter())
-            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
-    }
-
-    /// Strip all field qualifier in schema
-    pub fn strip_qualifiers(self) -> Self {
-        DFSchema {
-            fields: self
-                .fields
-                .into_iter()
-                .map(|f| f.strip_qualifier())
-                .collect(),
-        }
-    }
-
-    /// Replace all field qualifier with new value in schema
-    pub fn replace_qualifier(self, qualifier: &str) -> Self {
-        DFSchema {
-            fields: self
-                .fields
-                .into_iter()
-                .map(|f| {
-                    DFField::new(
-                        Some(qualifier),
-                        f.name(),
-                        f.data_type().to_owned(),
-                        f.is_nullable(),
-                    )
-                })
-                .collect(),
-        }
-    }
-
-    /// Get comma-seperated list of field names for use in error messages
-    fn get_field_names(&self) -> String {
-        self.fields
-            .iter()
-            .map(|f| match f.qualifier() {
-                Some(qualifier) => format!("'{}.{}'", qualifier, f.name()),
-                None => format!("'{}'", f.name()),
-            })
-            .collect::<Vec<_>>()
-            .join(", ")
-    }
-}
-
-impl From<DFSchema> for Schema {
-    /// Convert DFSchema into a Schema
-    fn from(df_schema: DFSchema) -> Self {
-        Schema::new(
-            df_schema
-                .fields
-                .into_iter()
-                .map(|f| {
-                    if f.qualifier().is_some() {
-                        Field::new(
-                            f.name().as_str(),
-                            f.data_type().to_owned(),
-                            f.is_nullable(),
-                        )
-                    } else {
-                        f.field
-                    }
-                })
-                .collect(),
-        )
-    }
-}
-
-impl From<&DFSchema> for Schema {
-    /// Convert DFSchema reference into a Schema
-    fn from(df_schema: &DFSchema) -> Self {
-        Schema::new(df_schema.fields.iter().map(|f| f.field.clone()).collect())
-    }
-}
-
-/// Create a `DFSchema` from an Arrow schema
-impl TryFrom<Schema> for DFSchema {
-    type Error = DataFusionError;
-    fn try_from(schema: Schema) -> std::result::Result<Self, Self::Error> {
-        Self::new(
-            schema
-                .fields()
-                .iter()
-                .map(|f| DFField::from(f.clone()))
-                .collect(),
-        )
-    }
-}
-
-impl From<DFSchema> for SchemaRef {
-    fn from(df_schema: DFSchema) -> Self {
-        SchemaRef::new(df_schema.into())
-    }
-}
-
-/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
-pub trait ToDFSchema
-where
-    Self: Sized,
-{
-    /// Attempt to create a DSSchema
-    #[allow(clippy::wrong_self_convention)]
-    fn to_dfschema(self) -> Result<DFSchema>;
-
-    /// Attempt to create a DSSchemaRef
-    #[allow(clippy::wrong_self_convention)]
-    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
-        Ok(Arc::new(self.to_dfschema()?))
-    }
-}
-
-impl ToDFSchema for Schema {
-    #[allow(clippy::wrong_self_convention)]
-    fn to_dfschema(self) -> Result<DFSchema> {
-        DFSchema::try_from(self)
-    }
-}
-
-impl ToDFSchema for SchemaRef {
-    #[allow(clippy::wrong_self_convention)]
-    fn to_dfschema(self) -> Result<DFSchema> {
-        // Attempt to use the Schema directly if there are no other
-        // references, otherwise clone
-        match Self::try_unwrap(self) {
-            Ok(schema) => DFSchema::try_from(schema),
-            Err(schemaref) => DFSchema::try_from(schemaref.as_ref().clone()),
-        }
-    }
-}
-
-impl ToDFSchema for Vec<DFField> {
-    fn to_dfschema(self) -> Result<DFSchema> {
-        DFSchema::new(self)
-    }
-}
-
-impl Display for DFSchema {
-    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
-        write!(
-            f,
-            "{}",
-            self.fields
-                .iter()
-                .map(|field| field.qualified_name())
-                .collect::<Vec<String>>()
-                .join(", ")
-        )
-    }
-}
-
-/// DFField wraps an Arrow field and adds an optional qualifier
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct DFField {
-    /// Optional qualifier (usually a table or relation name)
-    qualifier: Option<String>,
-    /// Arrow field definition
-    field: Field,
-}
-
-impl DFField {
-    /// Creates a new `DFField`
-    pub fn new(
-        qualifier: Option<&str>,
-        name: &str,
-        data_type: DataType,
-        nullable: bool,
-    ) -> Self {
-        DFField {
-            qualifier: qualifier.map(|s| s.to_owned()),
-            field: Field::new(name, data_type, nullable),
-        }
-    }
-
-    /// Create an unqualified field from an existing Arrow field
-    pub fn from(field: Field) -> Self {
-        Self {
-            qualifier: None,
-            field,
-        }
-    }
-
-    /// Create a qualified field from an existing Arrow field
-    pub fn from_qualified(qualifier: &str, field: Field) -> Self {
-        Self {
-            qualifier: Some(qualifier.to_owned()),
-            field,
-        }
-    }
-
-    /// Returns an immutable reference to the `DFField`'s unqualified name
-    pub fn name(&self) -> &String {
-        self.field.name()
-    }
-
-    /// Returns an immutable reference to the `DFField`'s data-type
-    pub fn data_type(&self) -> &DataType {
-        self.field.data_type()
-    }
-
-    /// Indicates whether this `DFField` supports null values
-    pub fn is_nullable(&self) -> bool {
-        self.field.is_nullable()
-    }
-
-    /// Returns a string to the `DFField`'s qualified name
-    pub fn qualified_name(&self) -> String {
-        if let Some(qualifier) = &self.qualifier {
-            format!("{}.{}", qualifier, self.field.name())
-        } else {
-            self.field.name().to_owned()
-        }
-    }
-
-    /// Builds a qualified column based on self
-    pub fn qualified_column(&self) -> Column {
-        Column {
-            relation: self.qualifier.clone(),
-            name: self.field.name().to_string(),
-        }
-    }
-
-    /// Builds an unqualified column based on self
-    pub fn unqualified_column(&self) -> Column {
-        Column {
-            relation: None,
-            name: self.field.name().to_string(),
-        }
-    }
-
-    /// Get the optional qualifier
-    pub fn qualifier(&self) -> Option<&String> {
-        self.qualifier.as_ref()
-    }
-
-    /// Get the arrow field
-    pub fn field(&self) -> &Field {
-        &self.field
-    }
-
-    /// Return field with qualifier stripped
-    pub fn strip_qualifier(mut self) -> Self {
-        self.qualifier = None;
-        self
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use arrow::datatypes::DataType;
-
-    #[test]
-    fn from_unqualified_field() {
-        let field = Field::new("c0", DataType::Boolean, true);
-        let field = DFField::from(field);
-        assert_eq!("c0", field.name());
-        assert_eq!("c0", field.qualified_name());
-    }
-
-    #[test]
-    fn from_qualified_field() {
-        let field = Field::new("c0", DataType::Boolean, true);
-        let field = DFField::from_qualified("t1", field);
-        assert_eq!("c0", field.name());
-        assert_eq!("t1.c0", field.qualified_name());
-    }
-
-    #[test]
-    fn from_unqualified_schema() -> Result<()> {
-        let schema = DFSchema::try_from(test_schema_1())?;
-        assert_eq!("c0, c1", schema.to_string());
-        Ok(())
-    }
-
-    #[test]
-    fn from_qualified_schema() -> Result<()> {
-        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        assert_eq!("t1.c0, t1.c1", schema.to_string());
-        Ok(())
-    }
-
-    #[test]
-    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
-        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let arrow_schema: Schema = schema.into();
-        let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, \
-        Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }";
-        assert_eq!(expected, arrow_schema.to_string());
-        Ok(())
-    }
-
-    #[test]
-    fn join_qualified() -> Result<()> {
-        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
-        let join = left.join(&right)?;
-        assert_eq!("t1.c0, t1.c1, t2.c0, t2.c1", join.to_string());
-        // test valid access
-        assert!(join.field_with_qualified_name("t1", "c0").is_ok());
-        assert!(join.field_with_qualified_name("t2", "c0").is_ok());
-        // test invalid access
-        assert!(join.field_with_unqualified_name("c0").is_err());
-        assert!(join.field_with_unqualified_name("t1.c0").is_err());
-        assert!(join.field_with_unqualified_name("t2.c0").is_err());
-        Ok(())
-    }
-
-    #[test]
-    fn join_qualified_duplicate() -> Result<()> {
-        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let join = left.join(&right);
-        assert!(join.is_err());
-        assert_eq!(
-            "Error during planning: Schema contains duplicate \
-        qualified field name \'t1.c0\'",
-            &format!("{}", join.err().unwrap())
-        );
-        Ok(())
-    }
-
-    #[test]
-    fn join_unqualified_duplicate() -> Result<()> {
-        let left = DFSchema::try_from(test_schema_1())?;
-        let right = DFSchema::try_from(test_schema_1())?;
-        let join = left.join(&right);
-        assert!(join.is_err());
-        assert_eq!(
-            "Error during planning: Schema contains duplicate \
-        unqualified field name \'c0\'",
-            &format!("{}", join.err().unwrap())
-        );
-        Ok(())
-    }
-
-    #[test]
-    fn join_mixed() -> Result<()> {
-        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let right = DFSchema::try_from(test_schema_2())?;
-        let join = left.join(&right)?;
-        assert_eq!("t1.c0, t1.c1, c100, c101", join.to_string());
-        // test valid access
-        assert!(join.field_with_qualified_name("t1", "c0").is_ok());
-        assert!(join.field_with_unqualified_name("c0").is_ok());
-        assert!(join.field_with_unqualified_name("c100").is_ok());
-        assert!(join.field_with_name(None, "c100").is_ok());
-        // test invalid access
-        assert!(join.field_with_unqualified_name("t1.c0").is_err());
-        assert!(join.field_with_unqualified_name("t1.c100").is_err());
-        assert!(join.field_with_qualified_name("", "c100").is_err());
-        Ok(())
-    }
-
-    #[test]
-    fn join_mixed_duplicate() -> Result<()> {
-        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let right = DFSchema::try_from(test_schema_1())?;
-        let join = left.join(&right);
-        assert!(join.is_err());
-        assert_eq!(
-            "Error during planning: Schema contains qualified \
-        field name \'t1.c0\' and unqualified field name \'c0\' which would be ambiguous",
-            &format!("{}", join.err().unwrap())
-        );
-        Ok(())
-    }
-
-    #[test]
-    fn helpful_error_messages() -> Result<()> {
-        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
-        let expected_help = "Valid fields are \'t1.c0\', \'t1.c1\'.";
-        assert!(schema
-            .field_with_qualified_name("x", "y")
-            .unwrap_err()
-            .to_string()
-            .contains(expected_help));
-        assert!(schema
-            .field_with_unqualified_name("y")
-            .unwrap_err()
-            .to_string()
-            .contains(expected_help));
-        assert!(schema
-            .index_of("y")
-            .unwrap_err()
-            .to_string()
-            .contains(expected_help));
-        Ok(())
-    }
-
-    #[test]
-    fn into() {
-        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
-        let arrow_schema = Schema::new(vec![Field::new("c0", DataType::Int64, true)]);
-        let arrow_schema_ref = Arc::new(arrow_schema.clone());
-
-        let df_schema =
-            DFSchema::new(vec![DFField::new(None, "c0", DataType::Int64, true)]).unwrap();
-        let df_schema_ref = Arc::new(df_schema.clone());
-
-        {
-            let arrow_schema = arrow_schema.clone();
-            let arrow_schema_ref = arrow_schema_ref.clone();
-
-            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
-            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
-        }
-
-        {
-            let arrow_schema = arrow_schema.clone();
-            let arrow_schema_ref = arrow_schema_ref.clone();
-
-            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
-            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
-        }
-
-        // Now, consume the refs
-        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
-        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
-    }
-
-    fn test_schema_1() -> Schema {
-        Schema::new(vec![
-            Field::new("c0", DataType::Boolean, true),
-            Field::new("c1", DataType::Boolean, true),
-        ])
-    }
-
-    fn test_schema_2() -> Schema {
-        Schema::new(vec![
-            Field::new("c100", DataType::Boolean, true),
-            Field::new("c101", DataType::Boolean, true),
-        ])
-    }
-}
+pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index 300c751..fba7d81 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -34,152 +34,14 @@ use crate::physical_plan::{
 use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
 use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
 use arrow::{compute::can_cast_types, datatypes::DataType};
+pub use datafusion_common::{Column, ExprSchema};
 use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature};
 use std::collections::{HashMap, HashSet};
-use std::convert::Infallible;
 use std::fmt;
 use std::hash::{BuildHasher, Hash, Hasher};
 use std::ops::Not;
-use std::str::FromStr;
 use std::sync::Arc;
 
-/// A named reference to a qualified field in a schema.
-#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
-pub struct Column {
-    /// relation/table name.
-    pub relation: Option<String>,
-    /// field/column name.
-    pub name: String,
-}
-
-impl Column {
-    /// Create Column from unqualified name.
-    pub fn from_name(name: impl Into<String>) -> Self {
-        Self {
-            relation: None,
-            name: name.into(),
-        }
-    }
-
-    /// Deserialize a fully qualified name string into a column
-    pub fn from_qualified_name(flat_name: &str) -> Self {
-        use sqlparser::tokenizer::Token;
-
-        let dialect = sqlparser::dialect::GenericDialect {};
-        let mut tokenizer = sqlparser::tokenizer::Tokenizer::new(&dialect, flat_name);
-        if let Ok(tokens) = tokenizer.tokenize() {
-            if let [Token::Word(relation), Token::Period, Token::Word(name)] =
-                tokens.as_slice()
-            {
-                return Column {
-                    relation: Some(relation.value.clone()),
-                    name: name.value.clone(),
-                };
-            }
-        }
-        // any expression that's not in the form of `foo.bar` will be treated as unqualified column
-        // name
-        Column {
-            relation: None,
-            name: String::from(flat_name),
-        }
-    }
-
-    /// Serialize column into a flat name string
-    pub fn flat_name(&self) -> String {
-        match &self.relation {
-            Some(r) => format!("{}.{}", r, self.name),
-            None => self.name.clone(),
-        }
-    }
-
-    /// Normalizes `self` if is unqualified (has no relation name)
-    /// with an explicit qualifier from the first matching input
-    /// schemas.
-    ///
-    /// For example, `foo` will be normalized to `t.foo` if there is a
-    /// column named `foo` in a relation named `t` found in `schemas`
-    pub fn normalize(self, plan: &LogicalPlan) -> Result<Self> {
-        let schemas = plan.all_schemas();
-        let using_columns = plan.using_columns()?;
-        self.normalize_with_schemas(&schemas, &using_columns)
-    }
-
-    // Internal implementation of normalize
-    fn normalize_with_schemas(
-        self,
-        schemas: &[&Arc<DFSchema>],
-        using_columns: &[HashSet<Column>],
-    ) -> Result<Self> {
-        if self.relation.is_some() {
-            return Ok(self);
-        }
-
-        for schema in schemas {
-            let fields = schema.fields_with_unqualified_name(&self.name);
-            match fields.len() {
-                0 => continue,
-                1 => {
-                    return Ok(fields[0].qualified_column());
-                }
-                _ => {
-                    // More than 1 fields in this schema have their names set to self.name.
-                    //
-                    // This should only happen when a JOIN query with USING constraint references
-                    // join columns using unqualified column name. For example:
-                    //
-                    // ```sql
-                    // SELECT id FROM t1 JOIN t2 USING(id)
-                    // ```
-                    //
-                    // In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
-                    // We will use the relation from the first matched field to normalize self.
-
-                    // Compare matched fields with one USING JOIN clause at a time
-                    for using_col in using_columns {
-                        let all_matched = fields
-                            .iter()
-                            .all(|f| using_col.contains(&f.qualified_column()));
-                        // All matched fields belong to the same using column set, in orther words
-                        // the same join clause. We simply pick the qualifer from the first match.
-                        if all_matched {
-                            return Ok(fields[0].qualified_column());
-                        }
-                    }
-                }
-            }
-        }
-
-        Err(DataFusionError::Plan(format!(
-            "Column {} not found in provided schemas",
-            self
-        )))
-    }
-}
-
-impl From<&str> for Column {
-    fn from(c: &str) -> Self {
-        Self::from_qualified_name(c)
-    }
-}
-
-impl FromStr for Column {
-    type Err = Infallible;
-
-    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
-        Ok(s.into())
-    }
-}
-
-impl fmt::Display for Column {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match &self.relation {
-            Some(r) => write!(f, "#{}.{}", r, self.name),
-            None => write!(f, "#{}", self.name),
-        }
-    }
-}
-
 /// `Expr` is a central struct of DataFusion's query API, and
 /// represent logical expressions such as `A + 1`, or `CAST(c1 AS
 /// int)`.
@@ -392,40 +254,6 @@ impl PartialOrd for Expr {
     }
 }
 
-/// Provides schema information needed by [Expr] methods such as
-/// [Expr::nullable] and [Expr::data_type].
-///
-/// Note that this trait is implemented for &[DFSchema] which is
-/// widely used in the DataFusion codebase.
-pub trait ExprSchema {
-    /// Is this column reference nullable?
-    fn nullable(&self, col: &Column) -> Result<bool>;
-
-    /// What is the datatype of this column?
-    fn data_type(&self, col: &Column) -> Result<&DataType>;
-}
-
-// Implement `ExprSchema` for `Arc<DFSchema>`
-impl<P: AsRef<DFSchema>> ExprSchema for P {
-    fn nullable(&self, col: &Column) -> Result<bool> {
-        self.as_ref().nullable(col)
-    }
-
-    fn data_type(&self, col: &Column) -> Result<&DataType> {
-        self.as_ref().data_type(col)
-    }
-}
-
-impl ExprSchema for DFSchema {
-    fn nullable(&self, col: &Column) -> Result<bool> {
-        Ok(self.field_from_column(col)?.is_nullable())
-    }
-
-    fn data_type(&self, col: &Column) -> Result<&DataType> {
-        Ok(self.field_from_column(col)?.data_type())
-    }
-}
-
 impl Expr {
     /// Returns the [arrow::datatypes::DataType] of the expression
     /// based on [ExprSchema]