You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/06 05:23:58 UTC
[arrow-datafusion] branch datafusion-common-column updated: move column, dfschema, etc. to common module
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch datafusion-common-column
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/datafusion-common-column by this push:
new 3bf434a move column, dfschema, etc. to common module
3bf434a is described below
commit 3bf434abb5b268751de7a09b4702739e7fc11f37
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:23:43 2022 +0800
move column, dfschema, etc. to common module
---
datafusion-common/src/column.rs | 2 +-
datafusion-common/src/dfschema.rs | 34 ++
datafusion-common/src/lib.rs | 2 +-
datafusion/src/logical_plan/builder.rs | 48 ++-
datafusion/src/logical_plan/dfschema.rs | 669 +-------------------------------
datafusion/src/logical_plan/expr.rs | 174 +--------
6 files changed, 72 insertions(+), 857 deletions(-)
diff --git a/datafusion-common/src/column.rs b/datafusion-common/src/column.rs
index 5d68916..02faa24 100644
--- a/datafusion-common/src/column.rs
+++ b/datafusion-common/src/column.rs
@@ -75,7 +75,7 @@ impl Column {
}
// Internal implementation of normalize
- fn normalize_with_schemas(
+ pub fn normalize_with_schemas(
self,
schemas: &[&Arc<DFSchema>],
using_columns: &[HashSet<Column>],
diff --git a/datafusion-common/src/dfschema.rs b/datafusion-common/src/dfschema.rs
index 453932a..46321c3 100644
--- a/datafusion-common/src/dfschema.rs
+++ b/datafusion-common/src/dfschema.rs
@@ -402,6 +402,40 @@ impl Display for DFSchema {
}
}
+/// Provides schema information needed by [Expr] methods such as
+/// [Expr::nullable] and [Expr::data_type].
+///
+/// Note that this trait is implemented for &[DFSchema] which is
+/// widely used in the DataFusion codebase.
+pub trait ExprSchema {
+ /// Is this column reference nullable?
+ fn nullable(&self, col: &Column) -> Result<bool>;
+
+ /// What is the datatype of this column?
+ fn data_type(&self, col: &Column) -> Result<&DataType>;
+}
+
+// Implement `ExprSchema` for `Arc<DFSchema>`
+impl<P: AsRef<DFSchema>> ExprSchema for P {
+ fn nullable(&self, col: &Column) -> Result<bool> {
+ self.as_ref().nullable(col)
+ }
+
+ fn data_type(&self, col: &Column) -> Result<&DataType> {
+ self.as_ref().data_type(col)
+ }
+}
+
+impl ExprSchema for DFSchema {
+ fn nullable(&self, col: &Column) -> Result<bool> {
+ Ok(self.field_from_column(col)?.is_nullable())
+ }
+
+ fn data_type(&self, col: &Column) -> Result<&DataType> {
+ Ok(self.field_from_column(col)?.data_type())
+ }
+}
+
/// DFField wraps an Arrow field and adds an optional qualifier
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DFField {
diff --git a/datafusion-common/src/lib.rs b/datafusion-common/src/lib.rs
index 8090fc0..11f9bbb 100644
--- a/datafusion-common/src/lib.rs
+++ b/datafusion-common/src/lib.rs
@@ -20,5 +20,5 @@ mod dfschema;
mod error;
pub use column::Column;
-pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
+pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
pub use error::{DataFusionError, Result};
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 613c8e9..d81fa9d 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -595,6 +595,17 @@ impl LogicalPlanBuilder {
self.join_detailed(right, join_type, join_keys, false)
}
+ fn normalize(
+ plan: &LogicalPlan,
+ column: impl Into<Column> + Clone,
+ ) -> Result<Column> {
+ let schemas = plan.all_schemas();
+ let using_columns = plan.using_columns()?;
+ column
+ .into()
+ .normalize_with_schemas(&schemas, &using_columns)
+ }
+
/// Apply a join with on constraint and specified null equality
/// If null_equals_null is true then null == null, else null != null
pub fn join_detailed(
@@ -633,7 +644,10 @@ impl LogicalPlanBuilder {
match (l_is_left, l_is_right, r_is_left, r_is_right) {
(_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
(Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
- _ => (l.normalize(&self.plan), r.normalize(right)),
+ _ => (
+ Self::normalize(&self.plan, l),
+ Self::normalize(right, r),
+ ),
}
}
(Some(lr), None) => {
@@ -643,9 +657,12 @@ impl LogicalPlanBuilder {
right.schema().field_with_qualified_name(lr, &l.name);
match (l_is_left, l_is_right) {
- (Ok(_), _) => (Ok(l), r.normalize(right)),
- (_, Ok(_)) => (r.normalize(&self.plan), Ok(l)),
- _ => (l.normalize(&self.plan), r.normalize(right)),
+ (Ok(_), _) => (Ok(l), Self::normalize(right, r)),
+ (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
+ _ => (
+ Self::normalize(&self.plan, l),
+ Self::normalize(right, r),
+ ),
}
}
(None, Some(rr)) => {
@@ -655,22 +672,25 @@ impl LogicalPlanBuilder {
right.schema().field_with_qualified_name(rr, &r.name);
match (r_is_left, r_is_right) {
- (Ok(_), _) => (Ok(r), l.normalize(right)),
- (_, Ok(_)) => (l.normalize(&self.plan), Ok(r)),
- _ => (l.normalize(&self.plan), r.normalize(right)),
+ (Ok(_), _) => (Ok(r), Self::normalize(right, l)),
+ (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
+ _ => (
+ Self::normalize(&self.plan, l),
+ Self::normalize(right, r),
+ ),
}
}
(None, None) => {
let mut swap = false;
- let left_key =
- l.clone().normalize(&self.plan).or_else(|_| {
+ let left_key = Self::normalize(&self.plan, l.clone())
+ .or_else(|_| {
swap = true;
- l.normalize(right)
+ Self::normalize(right, l)
});
if swap {
- (r.normalize(&self.plan), left_key)
+ (Self::normalize(&self.plan, r), left_key)
} else {
- (left_key, r.normalize(right))
+ (left_key, Self::normalize(right, r))
}
}
}
@@ -705,11 +725,11 @@ impl LogicalPlanBuilder {
let left_keys: Vec<Column> = using_keys
.clone()
.into_iter()
- .map(|c| c.into().normalize(&self.plan))
+ .map(|c| Self::normalize(&self.plan, c))
.collect::<Result<_>>()?;
let right_keys: Vec<Column> = using_keys
.into_iter()
- .map(|c| c.into().normalize(right))
+ .map(|c| Self::normalize(right, c))
.collect::<Result<_>>()?;
let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys.into_iter()).collect();
diff --git a/datafusion/src/logical_plan/dfschema.rs b/datafusion/src/logical_plan/dfschema.rs
index 7b6471f..eb62428 100644
--- a/datafusion/src/logical_plan/dfschema.rs
+++ b/datafusion/src/logical_plan/dfschema.rs
@@ -18,671 +18,4 @@
//! DFSchema is an extended schema struct that DataFusion uses to provide support for
//! fields with optional relation names.
-use std::collections::HashSet;
-use std::convert::TryFrom;
-use std::sync::Arc;
-
-use crate::error::{DataFusionError, Result};
-use crate::logical_plan::Column;
-
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use std::fmt::{Display, Formatter};
-
-/// A reference-counted reference to a `DFSchema`.
-pub type DFSchemaRef = Arc<DFSchema>;
-
-/// DFSchema wraps an Arrow schema and adds relation names
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct DFSchema {
- /// Fields
- fields: Vec<DFField>,
-}
-
-impl DFSchema {
- /// Creates an empty `DFSchema`
- pub fn empty() -> Self {
- Self { fields: vec![] }
- }
-
- /// Create a new `DFSchema`
- pub fn new(fields: Vec<DFField>) -> Result<Self> {
- let mut qualified_names = HashSet::new();
- let mut unqualified_names = HashSet::new();
-
- for field in &fields {
- if let Some(qualifier) = field.qualifier() {
- if !qualified_names.insert((qualifier, field.name())) {
- return Err(DataFusionError::Plan(format!(
- "Schema contains duplicate qualified field name '{}'",
- field.qualified_name()
- )));
- }
- } else if !unqualified_names.insert(field.name()) {
- return Err(DataFusionError::Plan(format!(
- "Schema contains duplicate unqualified field name '{}'",
- field.name()
- )));
- }
- }
-
- // check for mix of qualified and unqualified field with same unqualified name
- // note that we need to sort the contents of the HashSet first so that errors are
- // deterministic
- let mut qualified_names = qualified_names
- .iter()
- .map(|(l, r)| (l.to_owned(), r.to_owned()))
- .collect::<Vec<(&String, &String)>>();
- qualified_names.sort_by(|a, b| {
- let a = format!("{}.{}", a.0, a.1);
- let b = format!("{}.{}", b.0, b.1);
- a.cmp(&b)
- });
- for (qualifier, name) in &qualified_names {
- if unqualified_names.contains(name) {
- return Err(DataFusionError::Plan(format!(
- "Schema contains qualified field name '{}.{}' \
- and unqualified field name '{}' which would be ambiguous",
- qualifier, name, name
- )));
- }
- }
- Ok(Self { fields })
- }
-
- /// Create a `DFSchema` from an Arrow schema
- pub fn try_from_qualified_schema(qualifier: &str, schema: &Schema) -> Result<Self> {
- Self::new(
- schema
- .fields()
- .iter()
- .map(|f| DFField::from_qualified(qualifier, f.clone()))
- .collect(),
- )
- }
-
- /// Combine two schemas
- pub fn join(&self, schema: &DFSchema) -> Result<Self> {
- let mut fields = self.fields.clone();
- fields.extend_from_slice(schema.fields().as_slice());
- Self::new(fields)
- }
-
- /// Merge a schema into self
- pub fn merge(&mut self, other_schema: &DFSchema) {
- for field in other_schema.fields() {
- // skip duplicate columns
- let duplicated_field = match field.qualifier() {
- Some(q) => self.field_with_name(Some(q.as_str()), field.name()).is_ok(),
- // for unqualifed columns, check as unqualified name
- None => self.field_with_unqualified_name(field.name()).is_ok(),
- };
- if !duplicated_field {
- self.fields.push(field.clone());
- }
- }
- }
-
- /// Get a list of fields
- pub fn fields(&self) -> &Vec<DFField> {
- &self.fields
- }
-
- /// Returns an immutable reference of a specific `Field` instance selected using an
- /// offset within the internal `fields` vector
- pub fn field(&self, i: usize) -> &DFField {
- &self.fields[i]
- }
-
- /// Find the index of the column with the given unqualified name
- pub fn index_of(&self, name: &str) -> Result<usize> {
- for i in 0..self.fields.len() {
- if self.fields[i].name() == name {
- return Ok(i);
- }
- }
- Err(DataFusionError::Plan(format!(
- "No field named '{}'. Valid fields are {}.",
- name,
- self.get_field_names()
- )))
- }
-
- fn index_of_column_by_name(
- &self,
- qualifier: Option<&str>,
- name: &str,
- ) -> Result<usize> {
- let mut matches = self
- .fields
- .iter()
- .enumerate()
- .filter(|(_, field)| match (qualifier, &field.qualifier) {
- // field to lookup is qualified.
- // current field is qualified and not shared between relations, compare both
- // qualifier and name.
- (Some(q), Some(field_q)) => q == field_q && field.name() == name,
- // field to lookup is qualified but current field is unqualified.
- (Some(_), None) => false,
- // field to lookup is unqualified, no need to compare qualifier
- (None, Some(_)) | (None, None) => field.name() == name,
- })
- .map(|(idx, _)| idx);
- match matches.next() {
- None => Err(DataFusionError::Plan(format!(
- "No field named '{}.{}'. Valid fields are {}.",
- qualifier.unwrap_or("<unqualified>"),
- name,
- self.get_field_names()
- ))),
- Some(idx) => match matches.next() {
- None => Ok(idx),
- // found more than one matches
- Some(_) => Err(DataFusionError::Internal(format!(
- "Ambiguous reference to qualified field named '{}.{}'",
- qualifier.unwrap_or("<unqualified>"),
- name
- ))),
- },
- }
- }
-
- /// Find the index of the column with the given qualifier and name
- pub fn index_of_column(&self, col: &Column) -> Result<usize> {
- self.index_of_column_by_name(col.relation.as_deref(), &col.name)
- }
-
- /// Find the field with the given name
- pub fn field_with_name(
- &self,
- qualifier: Option<&str>,
- name: &str,
- ) -> Result<&DFField> {
- if let Some(qualifier) = qualifier {
- self.field_with_qualified_name(qualifier, name)
- } else {
- self.field_with_unqualified_name(name)
- }
- }
-
- /// Find all fields match the given name
- pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> {
- self.fields
- .iter()
- .filter(|field| field.name() == name)
- .collect()
- }
-
- /// Find the field with the given name
- pub fn field_with_unqualified_name(&self, name: &str) -> Result<&DFField> {
- let matches = self.fields_with_unqualified_name(name);
- match matches.len() {
- 0 => Err(DataFusionError::Plan(format!(
- "No field with unqualified name '{}'. Valid fields are {}.",
- name,
- self.get_field_names()
- ))),
- 1 => Ok(matches[0]),
- _ => Err(DataFusionError::Plan(format!(
- "Ambiguous reference to field named '{}'",
- name
- ))),
- }
- }
-
- /// Find the field with the given qualified name
- pub fn field_with_qualified_name(
- &self,
- qualifier: &str,
- name: &str,
- ) -> Result<&DFField> {
- let idx = self.index_of_column_by_name(Some(qualifier), name)?;
- Ok(self.field(idx))
- }
-
- /// Find the field with the given qualified column
- pub fn field_from_column(&self, column: &Column) -> Result<&DFField> {
- match &column.relation {
- Some(r) => self.field_with_qualified_name(r, &column.name),
- None => self.field_with_unqualified_name(&column.name),
- }
- }
-
- /// Check to see if unqualified field names matches field names in Arrow schema
- pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
- self.fields
- .iter()
- .zip(arrow_schema.fields().iter())
- .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
- }
-
- /// Strip all field qualifier in schema
- pub fn strip_qualifiers(self) -> Self {
- DFSchema {
- fields: self
- .fields
- .into_iter()
- .map(|f| f.strip_qualifier())
- .collect(),
- }
- }
-
- /// Replace all field qualifier with new value in schema
- pub fn replace_qualifier(self, qualifier: &str) -> Self {
- DFSchema {
- fields: self
- .fields
- .into_iter()
- .map(|f| {
- DFField::new(
- Some(qualifier),
- f.name(),
- f.data_type().to_owned(),
- f.is_nullable(),
- )
- })
- .collect(),
- }
- }
-
- /// Get comma-seperated list of field names for use in error messages
- fn get_field_names(&self) -> String {
- self.fields
- .iter()
- .map(|f| match f.qualifier() {
- Some(qualifier) => format!("'{}.{}'", qualifier, f.name()),
- None => format!("'{}'", f.name()),
- })
- .collect::<Vec<_>>()
- .join(", ")
- }
-}
-
-impl From<DFSchema> for Schema {
- /// Convert DFSchema into a Schema
- fn from(df_schema: DFSchema) -> Self {
- Schema::new(
- df_schema
- .fields
- .into_iter()
- .map(|f| {
- if f.qualifier().is_some() {
- Field::new(
- f.name().as_str(),
- f.data_type().to_owned(),
- f.is_nullable(),
- )
- } else {
- f.field
- }
- })
- .collect(),
- )
- }
-}
-
-impl From<&DFSchema> for Schema {
- /// Convert DFSchema reference into a Schema
- fn from(df_schema: &DFSchema) -> Self {
- Schema::new(df_schema.fields.iter().map(|f| f.field.clone()).collect())
- }
-}
-
-/// Create a `DFSchema` from an Arrow schema
-impl TryFrom<Schema> for DFSchema {
- type Error = DataFusionError;
- fn try_from(schema: Schema) -> std::result::Result<Self, Self::Error> {
- Self::new(
- schema
- .fields()
- .iter()
- .map(|f| DFField::from(f.clone()))
- .collect(),
- )
- }
-}
-
-impl From<DFSchema> for SchemaRef {
- fn from(df_schema: DFSchema) -> Self {
- SchemaRef::new(df_schema.into())
- }
-}
-
-/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
-pub trait ToDFSchema
-where
- Self: Sized,
-{
- /// Attempt to create a DSSchema
- #[allow(clippy::wrong_self_convention)]
- fn to_dfschema(self) -> Result<DFSchema>;
-
- /// Attempt to create a DSSchemaRef
- #[allow(clippy::wrong_self_convention)]
- fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
- Ok(Arc::new(self.to_dfschema()?))
- }
-}
-
-impl ToDFSchema for Schema {
- #[allow(clippy::wrong_self_convention)]
- fn to_dfschema(self) -> Result<DFSchema> {
- DFSchema::try_from(self)
- }
-}
-
-impl ToDFSchema for SchemaRef {
- #[allow(clippy::wrong_self_convention)]
- fn to_dfschema(self) -> Result<DFSchema> {
- // Attempt to use the Schema directly if there are no other
- // references, otherwise clone
- match Self::try_unwrap(self) {
- Ok(schema) => DFSchema::try_from(schema),
- Err(schemaref) => DFSchema::try_from(schemaref.as_ref().clone()),
- }
- }
-}
-
-impl ToDFSchema for Vec<DFField> {
- fn to_dfschema(self) -> Result<DFSchema> {
- DFSchema::new(self)
- }
-}
-
-impl Display for DFSchema {
- fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
- write!(
- f,
- "{}",
- self.fields
- .iter()
- .map(|field| field.qualified_name())
- .collect::<Vec<String>>()
- .join(", ")
- )
- }
-}
-
-/// DFField wraps an Arrow field and adds an optional qualifier
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct DFField {
- /// Optional qualifier (usually a table or relation name)
- qualifier: Option<String>,
- /// Arrow field definition
- field: Field,
-}
-
-impl DFField {
- /// Creates a new `DFField`
- pub fn new(
- qualifier: Option<&str>,
- name: &str,
- data_type: DataType,
- nullable: bool,
- ) -> Self {
- DFField {
- qualifier: qualifier.map(|s| s.to_owned()),
- field: Field::new(name, data_type, nullable),
- }
- }
-
- /// Create an unqualified field from an existing Arrow field
- pub fn from(field: Field) -> Self {
- Self {
- qualifier: None,
- field,
- }
- }
-
- /// Create a qualified field from an existing Arrow field
- pub fn from_qualified(qualifier: &str, field: Field) -> Self {
- Self {
- qualifier: Some(qualifier.to_owned()),
- field,
- }
- }
-
- /// Returns an immutable reference to the `DFField`'s unqualified name
- pub fn name(&self) -> &String {
- self.field.name()
- }
-
- /// Returns an immutable reference to the `DFField`'s data-type
- pub fn data_type(&self) -> &DataType {
- self.field.data_type()
- }
-
- /// Indicates whether this `DFField` supports null values
- pub fn is_nullable(&self) -> bool {
- self.field.is_nullable()
- }
-
- /// Returns a string to the `DFField`'s qualified name
- pub fn qualified_name(&self) -> String {
- if let Some(qualifier) = &self.qualifier {
- format!("{}.{}", qualifier, self.field.name())
- } else {
- self.field.name().to_owned()
- }
- }
-
- /// Builds a qualified column based on self
- pub fn qualified_column(&self) -> Column {
- Column {
- relation: self.qualifier.clone(),
- name: self.field.name().to_string(),
- }
- }
-
- /// Builds an unqualified column based on self
- pub fn unqualified_column(&self) -> Column {
- Column {
- relation: None,
- name: self.field.name().to_string(),
- }
- }
-
- /// Get the optional qualifier
- pub fn qualifier(&self) -> Option<&String> {
- self.qualifier.as_ref()
- }
-
- /// Get the arrow field
- pub fn field(&self) -> &Field {
- &self.field
- }
-
- /// Return field with qualifier stripped
- pub fn strip_qualifier(mut self) -> Self {
- self.qualifier = None;
- self
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use arrow::datatypes::DataType;
-
- #[test]
- fn from_unqualified_field() {
- let field = Field::new("c0", DataType::Boolean, true);
- let field = DFField::from(field);
- assert_eq!("c0", field.name());
- assert_eq!("c0", field.qualified_name());
- }
-
- #[test]
- fn from_qualified_field() {
- let field = Field::new("c0", DataType::Boolean, true);
- let field = DFField::from_qualified("t1", field);
- assert_eq!("c0", field.name());
- assert_eq!("t1.c0", field.qualified_name());
- }
-
- #[test]
- fn from_unqualified_schema() -> Result<()> {
- let schema = DFSchema::try_from(test_schema_1())?;
- assert_eq!("c0, c1", schema.to_string());
- Ok(())
- }
-
- #[test]
- fn from_qualified_schema() -> Result<()> {
- let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
- assert_eq!("t1.c0, t1.c1", schema.to_string());
- Ok(())
- }
-
- #[test]
- fn from_qualified_schema_into_arrow_schema() -> Result<()> {
- let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
- let arrow_schema: Schema = schema.into();
- let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, \
- Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }";
- assert_eq!(expected, arrow_schema.to_string());
- Ok(())
- }
-
- #[test]
- fn join_qualified() -> Result<()> {
- let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
- let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
- let join = left.join(&right)?;
- assert_eq!("t1.c0, t1.c1, t2.c0, t2.c1", join.to_string());
- // test valid access
- assert!(join.field_with_qualified_name("t1", "c0").is_ok());
- assert!(join.field_with_qualified_name("t2", "c0").is_ok());
- // test invalid access
- assert!(join.field_with_unqualified_name("c0").is_err());
- assert!(join.field_with_unqualified_name("t1.c0").is_err());
- assert!(join.field_with_unqualified_name("t2.c0").is_err());
- Ok(())
- }
-
- #[test]
- fn join_qualified_duplicate() -> Result<()> {
- let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
- let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
- let join = left.join(&right);
- assert!(join.is_err());
- assert_eq!(
- "Error during planning: Schema contains duplicate \
- qualified field name \'t1.c0\'",
- &format!("{}", join.err().unwrap())
- );
- Ok(())
- }
-
- #[test]
- fn join_unqualified_duplicate() -> Result<()> {
- let left = DFSchema::try_from(test_schema_1())?;
- let right = DFSchema::try_from(test_schema_1())?;
- let join = left.join(&right);
- assert!(join.is_err());
- assert_eq!(
- "Error during planning: Schema contains duplicate \
- unqualified field name \'c0\'",
- &format!("{}", join.err().unwrap())
- );
- Ok(())
- }
-
- #[test]
- fn join_mixed() -> Result<()> {
- let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
- let right = DFSchema::try_from(test_schema_2())?;
- let join = left.join(&right)?;
- assert_eq!("t1.c0, t1.c1, c100, c101", join.to_string());
- // test valid access
- assert!(join.field_with_qualified_name("t1", "c0").is_ok());
- assert!(join.field_with_unqualified_name("c0").is_ok());
- assert!(join.field_with_unqualified_name("c100").is_ok());
- assert!(join.field_with_name(None, "c100").is_ok());
- // test invalid access
- assert!(join.field_with_unqualified_name("t1.c0").is_err());
- assert!(join.field_with_unqualified_name("t1.c100").is_err());
- assert!(join.field_with_qualified_name("", "c100").is_err());
- Ok(())
- }
-
- #[test]
- fn join_mixed_duplicate() -> Result<()> {
- let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
- let right = DFSchema::try_from(test_schema_1())?;
- let join = left.join(&right);
- assert!(join.is_err());
- assert_eq!(
- "Error during planning: Schema contains qualified \
- field name \'t1.c0\' and unqualified field name \'c0\' which would be ambiguous",
- &format!("{}", join.err().unwrap())
- );
- Ok(())
- }
-
- #[test]
- fn helpful_error_messages() -> Result<()> {
- let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
- let expected_help = "Valid fields are \'t1.c0\', \'t1.c1\'.";
- assert!(schema
- .field_with_qualified_name("x", "y")
- .unwrap_err()
- .to_string()
- .contains(expected_help));
- assert!(schema
- .field_with_unqualified_name("y")
- .unwrap_err()
- .to_string()
- .contains(expected_help));
- assert!(schema
- .index_of("y")
- .unwrap_err()
- .to_string()
- .contains(expected_help));
- Ok(())
- }
-
- #[test]
- fn into() {
- // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
- let arrow_schema = Schema::new(vec![Field::new("c0", DataType::Int64, true)]);
- let arrow_schema_ref = Arc::new(arrow_schema.clone());
-
- let df_schema =
- DFSchema::new(vec![DFField::new(None, "c0", DataType::Int64, true)]).unwrap();
- let df_schema_ref = Arc::new(df_schema.clone());
-
- {
- let arrow_schema = arrow_schema.clone();
- let arrow_schema_ref = arrow_schema_ref.clone();
-
- assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
- assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
- }
-
- {
- let arrow_schema = arrow_schema.clone();
- let arrow_schema_ref = arrow_schema_ref.clone();
-
- assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
- assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
- }
-
- // Now, consume the refs
- assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
- assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
- }
-
- fn test_schema_1() -> Schema {
- Schema::new(vec![
- Field::new("c0", DataType::Boolean, true),
- Field::new("c1", DataType::Boolean, true),
- ])
- }
-
- fn test_schema_2() -> Schema {
- Schema::new(vec![
- Field::new("c100", DataType::Boolean, true),
- Field::new("c101", DataType::Boolean, true),
- ])
- }
-}
+pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index 300c751..fba7d81 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -34,152 +34,14 @@ use crate::physical_plan::{
use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
use arrow::{compute::can_cast_types, datatypes::DataType};
+pub use datafusion_common::{Column, ExprSchema};
use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature};
use std::collections::{HashMap, HashSet};
-use std::convert::Infallible;
use std::fmt;
use std::hash::{BuildHasher, Hash, Hasher};
use std::ops::Not;
-use std::str::FromStr;
use std::sync::Arc;
-/// A named reference to a qualified field in a schema.
-#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
-pub struct Column {
- /// relation/table name.
- pub relation: Option<String>,
- /// field/column name.
- pub name: String,
-}
-
-impl Column {
- /// Create Column from unqualified name.
- pub fn from_name(name: impl Into<String>) -> Self {
- Self {
- relation: None,
- name: name.into(),
- }
- }
-
- /// Deserialize a fully qualified name string into a column
- pub fn from_qualified_name(flat_name: &str) -> Self {
- use sqlparser::tokenizer::Token;
-
- let dialect = sqlparser::dialect::GenericDialect {};
- let mut tokenizer = sqlparser::tokenizer::Tokenizer::new(&dialect, flat_name);
- if let Ok(tokens) = tokenizer.tokenize() {
- if let [Token::Word(relation), Token::Period, Token::Word(name)] =
- tokens.as_slice()
- {
- return Column {
- relation: Some(relation.value.clone()),
- name: name.value.clone(),
- };
- }
- }
- // any expression that's not in the form of `foo.bar` will be treated as unqualified column
- // name
- Column {
- relation: None,
- name: String::from(flat_name),
- }
- }
-
- /// Serialize column into a flat name string
- pub fn flat_name(&self) -> String {
- match &self.relation {
- Some(r) => format!("{}.{}", r, self.name),
- None => self.name.clone(),
- }
- }
-
- /// Normalizes `self` if is unqualified (has no relation name)
- /// with an explicit qualifier from the first matching input
- /// schemas.
- ///
- /// For example, `foo` will be normalized to `t.foo` if there is a
- /// column named `foo` in a relation named `t` found in `schemas`
- pub fn normalize(self, plan: &LogicalPlan) -> Result<Self> {
- let schemas = plan.all_schemas();
- let using_columns = plan.using_columns()?;
- self.normalize_with_schemas(&schemas, &using_columns)
- }
-
- // Internal implementation of normalize
- fn normalize_with_schemas(
- self,
- schemas: &[&Arc<DFSchema>],
- using_columns: &[HashSet<Column>],
- ) -> Result<Self> {
- if self.relation.is_some() {
- return Ok(self);
- }
-
- for schema in schemas {
- let fields = schema.fields_with_unqualified_name(&self.name);
- match fields.len() {
- 0 => continue,
- 1 => {
- return Ok(fields[0].qualified_column());
- }
- _ => {
- // More than 1 fields in this schema have their names set to self.name.
- //
- // This should only happen when a JOIN query with USING constraint references
- // join columns using unqualified column name. For example:
- //
- // ```sql
- // SELECT id FROM t1 JOIN t2 USING(id)
- // ```
- //
- // In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
- // We will use the relation from the first matched field to normalize self.
-
- // Compare matched fields with one USING JOIN clause at a time
- for using_col in using_columns {
- let all_matched = fields
- .iter()
- .all(|f| using_col.contains(&f.qualified_column()));
- // All matched fields belong to the same using column set, in orther words
- // the same join clause. We simply pick the qualifer from the first match.
- if all_matched {
- return Ok(fields[0].qualified_column());
- }
- }
- }
- }
- }
-
- Err(DataFusionError::Plan(format!(
- "Column {} not found in provided schemas",
- self
- )))
- }
-}
-
-impl From<&str> for Column {
- fn from(c: &str) -> Self {
- Self::from_qualified_name(c)
- }
-}
-
-impl FromStr for Column {
- type Err = Infallible;
-
- fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
- Ok(s.into())
- }
-}
-
-impl fmt::Display for Column {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match &self.relation {
- Some(r) => write!(f, "#{}.{}", r, self.name),
- None => write!(f, "#{}", self.name),
- }
- }
-}
-
/// `Expr` is a central struct of DataFusion's query API, and
/// represent logical expressions such as `A + 1`, or `CAST(c1 AS
/// int)`.
@@ -392,40 +254,6 @@ impl PartialOrd for Expr {
}
}
-/// Provides schema information needed by [Expr] methods such as
-/// [Expr::nullable] and [Expr::data_type].
-///
-/// Note that this trait is implemented for &[DFSchema] which is
-/// widely used in the DataFusion codebase.
-pub trait ExprSchema {
- /// Is this column reference nullable?
- fn nullable(&self, col: &Column) -> Result<bool>;
-
- /// What is the datatype of this column?
- fn data_type(&self, col: &Column) -> Result<&DataType>;
-}
-
-// Implement `ExprSchema` for `Arc<DFSchema>`
-impl<P: AsRef<DFSchema>> ExprSchema for P {
- fn nullable(&self, col: &Column) -> Result<bool> {
- self.as_ref().nullable(col)
- }
-
- fn data_type(&self, col: &Column) -> Result<&DataType> {
- self.as_ref().data_type(col)
- }
-}
-
-impl ExprSchema for DFSchema {
- fn nullable(&self, col: &Column) -> Result<bool> {
- Ok(self.field_from_column(col)?.is_nullable())
- }
-
- fn data_type(&self, col: &Column) -> Result<&DataType> {
- Ok(self.field_from_column(col)?.data_type())
- }
-}
-
impl Expr {
/// Returns the [arrow::datatypes::DataType] of the expression
/// based on [ExprSchema]