You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/06 05:30:47 UTC
[arrow-datafusion] 01/02: move dfschema and column
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch datafusion-common-column
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit 0a93a2ce17fca485e9d2a5c8a666ed69a4d4c167
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:08:53 2022 +0800
move dfschema and column
---
datafusion-common/src/column.rs | 150 +++++++++
datafusion-common/src/dfschema.rs | 688 ++++++++++++++++++++++++++++++++++++++
datafusion-common/src/lib.rs | 4 +
3 files changed, 842 insertions(+)
diff --git a/datafusion-common/src/column.rs b/datafusion-common/src/column.rs
new file mode 100644
index 0000000..5d68916
--- /dev/null
+++ b/datafusion-common/src/column.rs
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Column
+
+use crate::{DFSchema, DataFusionError, Result};
+use std::collections::HashSet;
+use std::convert::Infallible;
+use std::fmt;
+use std::str::FromStr;
+use std::sync::Arc;
+
+/// A named reference to a qualified field in a schema.
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
+pub struct Column {
+ /// relation/table name.
+ pub relation: Option<String>,
+ /// field/column name.
+ pub name: String,
+}
+
+impl Column {
+ /// Create Column from unqualified name.
+ pub fn from_name(name: impl Into<String>) -> Self {
+ Self {
+ relation: None,
+ name: name.into(),
+ }
+ }
+
+ /// Deserialize a fully qualified name string into a column
+ pub fn from_qualified_name(flat_name: &str) -> Self {
+ use sqlparser::tokenizer::Token;
+
+ let dialect = sqlparser::dialect::GenericDialect {};
+ let mut tokenizer = sqlparser::tokenizer::Tokenizer::new(&dialect, flat_name);
+ if let Ok(tokens) = tokenizer.tokenize() {
+ if let [Token::Word(relation), Token::Period, Token::Word(name)] =
+ tokens.as_slice()
+ {
+ return Column {
+ relation: Some(relation.value.clone()),
+ name: name.value.clone(),
+ };
+ }
+ }
+ // any expression that's not in the form of `foo.bar` will be treated as unqualified column
+ // name
+ Column {
+ relation: None,
+ name: String::from(flat_name),
+ }
+ }
+
+ /// Serialize column into a flat name string
+ pub fn flat_name(&self) -> String {
+ match &self.relation {
+ Some(r) => format!("{}.{}", r, self.name),
+ None => self.name.clone(),
+ }
+ }
+
+ // Internal implementation of normalize
+ fn normalize_with_schemas(
+ self,
+ schemas: &[&Arc<DFSchema>],
+ using_columns: &[HashSet<Column>],
+ ) -> Result<Self> {
+ if self.relation.is_some() {
+ return Ok(self);
+ }
+
+ for schema in schemas {
+ let fields = schema.fields_with_unqualified_name(&self.name);
+ match fields.len() {
+ 0 => continue,
+ 1 => {
+ return Ok(fields[0].qualified_column());
+ }
+ _ => {
+ // More than 1 fields in this schema have their names set to self.name.
+ //
+ // This should only happen when a JOIN query with USING constraint references
+ // join columns using unqualified column name. For example:
+ //
+ // ```sql
+ // SELECT id FROM t1 JOIN t2 USING(id)
+ // ```
+ //
+ // In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
+ // We will use the relation from the first matched field to normalize self.
+
+ // Compare matched fields with one USING JOIN clause at a time
+ for using_col in using_columns {
+ let all_matched = fields
+ .iter()
+ .all(|f| using_col.contains(&f.qualified_column()));
+ // All matched fields belong to the same using column set, in orther words
+ // the same join clause. We simply pick the qualifer from the first match.
+ if all_matched {
+ return Ok(fields[0].qualified_column());
+ }
+ }
+ }
+ }
+ }
+
+ Err(DataFusionError::Plan(format!(
+ "Column {} not found in provided schemas",
+ self
+ )))
+ }
+}
+
+impl From<&str> for Column {
+ fn from(c: &str) -> Self {
+ Self::from_qualified_name(c)
+ }
+}
+
+impl FromStr for Column {
+ type Err = Infallible;
+
+ fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+ Ok(s.into())
+ }
+}
+
+impl fmt::Display for Column {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match &self.relation {
+ Some(r) => write!(f, "#{}.{}", r, self.name),
+ None => write!(f, "#{}", self.name),
+ }
+ }
+}
diff --git a/datafusion-common/src/dfschema.rs b/datafusion-common/src/dfschema.rs
new file mode 100644
index 0000000..453932a
--- /dev/null
+++ b/datafusion-common/src/dfschema.rs
@@ -0,0 +1,688 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DFSchema is an extended schema struct that DataFusion uses to provide support for
+//! fields with optional relation names.
+
+use std::collections::HashSet;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::Column;
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use std::fmt::{Display, Formatter};
+
+/// A reference-counted reference to a `DFSchema`.
+pub type DFSchemaRef = Arc<DFSchema>;
+
+/// DFSchema wraps an Arrow schema and adds relation names
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct DFSchema {
+ /// Fields
+ fields: Vec<DFField>,
+}
+
+impl DFSchema {
+ /// Creates an empty `DFSchema`
+ pub fn empty() -> Self {
+ Self { fields: vec![] }
+ }
+
+ /// Create a new `DFSchema`
+ pub fn new(fields: Vec<DFField>) -> Result<Self> {
+ let mut qualified_names = HashSet::new();
+ let mut unqualified_names = HashSet::new();
+
+ for field in &fields {
+ if let Some(qualifier) = field.qualifier() {
+ if !qualified_names.insert((qualifier, field.name())) {
+ return Err(DataFusionError::Plan(format!(
+ "Schema contains duplicate qualified field name '{}'",
+ field.qualified_name()
+ )));
+ }
+ } else if !unqualified_names.insert(field.name()) {
+ return Err(DataFusionError::Plan(format!(
+ "Schema contains duplicate unqualified field name '{}'",
+ field.name()
+ )));
+ }
+ }
+
+ // check for mix of qualified and unqualified field with same unqualified name
+ // note that we need to sort the contents of the HashSet first so that errors are
+ // deterministic
+ let mut qualified_names = qualified_names
+ .iter()
+ .map(|(l, r)| (l.to_owned(), r.to_owned()))
+ .collect::<Vec<(&String, &String)>>();
+ qualified_names.sort_by(|a, b| {
+ let a = format!("{}.{}", a.0, a.1);
+ let b = format!("{}.{}", b.0, b.1);
+ a.cmp(&b)
+ });
+ for (qualifier, name) in &qualified_names {
+ if unqualified_names.contains(name) {
+ return Err(DataFusionError::Plan(format!(
+ "Schema contains qualified field name '{}.{}' \
+ and unqualified field name '{}' which would be ambiguous",
+ qualifier, name, name
+ )));
+ }
+ }
+ Ok(Self { fields })
+ }
+
+ /// Create a `DFSchema` from an Arrow schema
+ pub fn try_from_qualified_schema(qualifier: &str, schema: &Schema) -> Result<Self> {
+ Self::new(
+ schema
+ .fields()
+ .iter()
+ .map(|f| DFField::from_qualified(qualifier, f.clone()))
+ .collect(),
+ )
+ }
+
+ /// Combine two schemas
+ pub fn join(&self, schema: &DFSchema) -> Result<Self> {
+ let mut fields = self.fields.clone();
+ fields.extend_from_slice(schema.fields().as_slice());
+ Self::new(fields)
+ }
+
+ /// Merge a schema into self
+ pub fn merge(&mut self, other_schema: &DFSchema) {
+ for field in other_schema.fields() {
+ // skip duplicate columns
+ let duplicated_field = match field.qualifier() {
+ Some(q) => self.field_with_name(Some(q.as_str()), field.name()).is_ok(),
+ // for unqualifed columns, check as unqualified name
+ None => self.field_with_unqualified_name(field.name()).is_ok(),
+ };
+ if !duplicated_field {
+ self.fields.push(field.clone());
+ }
+ }
+ }
+
+ /// Get a list of fields
+ pub fn fields(&self) -> &Vec<DFField> {
+ &self.fields
+ }
+
+ /// Returns an immutable reference of a specific `Field` instance selected using an
+ /// offset within the internal `fields` vector
+ pub fn field(&self, i: usize) -> &DFField {
+ &self.fields[i]
+ }
+
+ /// Find the index of the column with the given unqualified name
+ pub fn index_of(&self, name: &str) -> Result<usize> {
+ for i in 0..self.fields.len() {
+ if self.fields[i].name() == name {
+ return Ok(i);
+ }
+ }
+ Err(DataFusionError::Plan(format!(
+ "No field named '{}'. Valid fields are {}.",
+ name,
+ self.get_field_names()
+ )))
+ }
+
+ fn index_of_column_by_name(
+ &self,
+ qualifier: Option<&str>,
+ name: &str,
+ ) -> Result<usize> {
+ let mut matches = self
+ .fields
+ .iter()
+ .enumerate()
+ .filter(|(_, field)| match (qualifier, &field.qualifier) {
+ // field to lookup is qualified.
+ // current field is qualified and not shared between relations, compare both
+ // qualifier and name.
+ (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+ // field to lookup is qualified but current field is unqualified.
+ (Some(_), None) => false,
+ // field to lookup is unqualified, no need to compare qualifier
+ (None, Some(_)) | (None, None) => field.name() == name,
+ })
+ .map(|(idx, _)| idx);
+ match matches.next() {
+ None => Err(DataFusionError::Plan(format!(
+ "No field named '{}.{}'. Valid fields are {}.",
+ qualifier.unwrap_or("<unqualified>"),
+ name,
+ self.get_field_names()
+ ))),
+ Some(idx) => match matches.next() {
+ None => Ok(idx),
+ // found more than one matches
+ Some(_) => Err(DataFusionError::Internal(format!(
+ "Ambiguous reference to qualified field named '{}.{}'",
+ qualifier.unwrap_or("<unqualified>"),
+ name
+ ))),
+ },
+ }
+ }
+
+ /// Find the index of the column with the given qualifier and name
+ pub fn index_of_column(&self, col: &Column) -> Result<usize> {
+ self.index_of_column_by_name(col.relation.as_deref(), &col.name)
+ }
+
+ /// Find the field with the given name
+ pub fn field_with_name(
+ &self,
+ qualifier: Option<&str>,
+ name: &str,
+ ) -> Result<&DFField> {
+ if let Some(qualifier) = qualifier {
+ self.field_with_qualified_name(qualifier, name)
+ } else {
+ self.field_with_unqualified_name(name)
+ }
+ }
+
+ /// Find all fields match the given name
+ pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> {
+ self.fields
+ .iter()
+ .filter(|field| field.name() == name)
+ .collect()
+ }
+
+ /// Find the field with the given name
+ pub fn field_with_unqualified_name(&self, name: &str) -> Result<&DFField> {
+ let matches = self.fields_with_unqualified_name(name);
+ match matches.len() {
+ 0 => Err(DataFusionError::Plan(format!(
+ "No field with unqualified name '{}'. Valid fields are {}.",
+ name,
+ self.get_field_names()
+ ))),
+ 1 => Ok(matches[0]),
+ _ => Err(DataFusionError::Plan(format!(
+ "Ambiguous reference to field named '{}'",
+ name
+ ))),
+ }
+ }
+
+ /// Find the field with the given qualified name
+ pub fn field_with_qualified_name(
+ &self,
+ qualifier: &str,
+ name: &str,
+ ) -> Result<&DFField> {
+ let idx = self.index_of_column_by_name(Some(qualifier), name)?;
+ Ok(self.field(idx))
+ }
+
+ /// Find the field with the given qualified column
+ pub fn field_from_column(&self, column: &Column) -> Result<&DFField> {
+ match &column.relation {
+ Some(r) => self.field_with_qualified_name(r, &column.name),
+ None => self.field_with_unqualified_name(&column.name),
+ }
+ }
+
+ /// Check to see if unqualified field names matches field names in Arrow schema
+ pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
+ self.fields
+ .iter()
+ .zip(arrow_schema.fields().iter())
+ .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
+ }
+
+ /// Strip all field qualifier in schema
+ pub fn strip_qualifiers(self) -> Self {
+ DFSchema {
+ fields: self
+ .fields
+ .into_iter()
+ .map(|f| f.strip_qualifier())
+ .collect(),
+ }
+ }
+
+ /// Replace all field qualifier with new value in schema
+ pub fn replace_qualifier(self, qualifier: &str) -> Self {
+ DFSchema {
+ fields: self
+ .fields
+ .into_iter()
+ .map(|f| {
+ DFField::new(
+ Some(qualifier),
+ f.name(),
+ f.data_type().to_owned(),
+ f.is_nullable(),
+ )
+ })
+ .collect(),
+ }
+ }
+
+ /// Get comma-seperated list of field names for use in error messages
+ fn get_field_names(&self) -> String {
+ self.fields
+ .iter()
+ .map(|f| match f.qualifier() {
+ Some(qualifier) => format!("'{}.{}'", qualifier, f.name()),
+ None => format!("'{}'", f.name()),
+ })
+ .collect::<Vec<_>>()
+ .join(", ")
+ }
+}
+
+impl From<DFSchema> for Schema {
+ /// Convert DFSchema into a Schema
+ fn from(df_schema: DFSchema) -> Self {
+ Schema::new(
+ df_schema
+ .fields
+ .into_iter()
+ .map(|f| {
+ if f.qualifier().is_some() {
+ Field::new(
+ f.name().as_str(),
+ f.data_type().to_owned(),
+ f.is_nullable(),
+ )
+ } else {
+ f.field
+ }
+ })
+ .collect(),
+ )
+ }
+}
+
+impl From<&DFSchema> for Schema {
+ /// Convert DFSchema reference into a Schema
+ fn from(df_schema: &DFSchema) -> Self {
+ Schema::new(df_schema.fields.iter().map(|f| f.field.clone()).collect())
+ }
+}
+
+/// Create a `DFSchema` from an Arrow schema
+impl TryFrom<Schema> for DFSchema {
+ type Error = DataFusionError;
+ fn try_from(schema: Schema) -> std::result::Result<Self, Self::Error> {
+ Self::new(
+ schema
+ .fields()
+ .iter()
+ .map(|f| DFField::from(f.clone()))
+ .collect(),
+ )
+ }
+}
+
+impl From<DFSchema> for SchemaRef {
+ fn from(df_schema: DFSchema) -> Self {
+ SchemaRef::new(df_schema.into())
+ }
+}
+
+/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
+pub trait ToDFSchema
+where
+ Self: Sized,
+{
+ /// Attempt to create a DSSchema
+ #[allow(clippy::wrong_self_convention)]
+ fn to_dfschema(self) -> Result<DFSchema>;
+
+ /// Attempt to create a DSSchemaRef
+ #[allow(clippy::wrong_self_convention)]
+ fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
+ Ok(Arc::new(self.to_dfschema()?))
+ }
+}
+
+impl ToDFSchema for Schema {
+ #[allow(clippy::wrong_self_convention)]
+ fn to_dfschema(self) -> Result<DFSchema> {
+ DFSchema::try_from(self)
+ }
+}
+
+impl ToDFSchema for SchemaRef {
+ #[allow(clippy::wrong_self_convention)]
+ fn to_dfschema(self) -> Result<DFSchema> {
+ // Attempt to use the Schema directly if there are no other
+ // references, otherwise clone
+ match Self::try_unwrap(self) {
+ Ok(schema) => DFSchema::try_from(schema),
+ Err(schemaref) => DFSchema::try_from(schemaref.as_ref().clone()),
+ }
+ }
+}
+
+impl ToDFSchema for Vec<DFField> {
+ fn to_dfschema(self) -> Result<DFSchema> {
+ DFSchema::new(self)
+ }
+}
+
+impl Display for DFSchema {
+ fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+ write!(
+ f,
+ "{}",
+ self.fields
+ .iter()
+ .map(|field| field.qualified_name())
+ .collect::<Vec<String>>()
+ .join(", ")
+ )
+ }
+}
+
+/// DFField wraps an Arrow field and adds an optional qualifier
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct DFField {
+ /// Optional qualifier (usually a table or relation name)
+ qualifier: Option<String>,
+ /// Arrow field definition
+ field: Field,
+}
+
+impl DFField {
+ /// Creates a new `DFField`
+ pub fn new(
+ qualifier: Option<&str>,
+ name: &str,
+ data_type: DataType,
+ nullable: bool,
+ ) -> Self {
+ DFField {
+ qualifier: qualifier.map(|s| s.to_owned()),
+ field: Field::new(name, data_type, nullable),
+ }
+ }
+
+ /// Create an unqualified field from an existing Arrow field
+ pub fn from(field: Field) -> Self {
+ Self {
+ qualifier: None,
+ field,
+ }
+ }
+
+ /// Create a qualified field from an existing Arrow field
+ pub fn from_qualified(qualifier: &str, field: Field) -> Self {
+ Self {
+ qualifier: Some(qualifier.to_owned()),
+ field,
+ }
+ }
+
+ /// Returns an immutable reference to the `DFField`'s unqualified name
+ pub fn name(&self) -> &String {
+ self.field.name()
+ }
+
+ /// Returns an immutable reference to the `DFField`'s data-type
+ pub fn data_type(&self) -> &DataType {
+ self.field.data_type()
+ }
+
+ /// Indicates whether this `DFField` supports null values
+ pub fn is_nullable(&self) -> bool {
+ self.field.is_nullable()
+ }
+
+ /// Returns a string to the `DFField`'s qualified name
+ pub fn qualified_name(&self) -> String {
+ if let Some(qualifier) = &self.qualifier {
+ format!("{}.{}", qualifier, self.field.name())
+ } else {
+ self.field.name().to_owned()
+ }
+ }
+
+ /// Builds a qualified column based on self
+ pub fn qualified_column(&self) -> Column {
+ Column {
+ relation: self.qualifier.clone(),
+ name: self.field.name().to_string(),
+ }
+ }
+
+ /// Builds an unqualified column based on self
+ pub fn unqualified_column(&self) -> Column {
+ Column {
+ relation: None,
+ name: self.field.name().to_string(),
+ }
+ }
+
+ /// Get the optional qualifier
+ pub fn qualifier(&self) -> Option<&String> {
+ self.qualifier.as_ref()
+ }
+
+ /// Get the arrow field
+ pub fn field(&self) -> &Field {
+ &self.field
+ }
+
+ /// Return field with qualifier stripped
+ pub fn strip_qualifier(mut self) -> Self {
+ self.qualifier = None;
+ self
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::datatypes::DataType;
+
+ #[test]
+ fn from_unqualified_field() {
+ let field = Field::new("c0", DataType::Boolean, true);
+ let field = DFField::from(field);
+ assert_eq!("c0", field.name());
+ assert_eq!("c0", field.qualified_name());
+ }
+
+ #[test]
+ fn from_qualified_field() {
+ let field = Field::new("c0", DataType::Boolean, true);
+ let field = DFField::from_qualified("t1", field);
+ assert_eq!("c0", field.name());
+ assert_eq!("t1.c0", field.qualified_name());
+ }
+
+ #[test]
+ fn from_unqualified_schema() -> Result<()> {
+ let schema = DFSchema::try_from(test_schema_1())?;
+ assert_eq!("c0, c1", schema.to_string());
+ Ok(())
+ }
+
+ #[test]
+ fn from_qualified_schema() -> Result<()> {
+ let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
+ assert_eq!("t1.c0, t1.c1", schema.to_string());
+ Ok(())
+ }
+
+ #[test]
+ fn from_qualified_schema_into_arrow_schema() -> Result<()> {
+ let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
+ let arrow_schema: Schema = schema.into();
+ let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, \
+ Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }";
+ assert_eq!(expected, arrow_schema.to_string());
+ Ok(())
+ }
+
+ #[test]
+ fn join_qualified() -> Result<()> {
+ let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
+ let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
+ let join = left.join(&right)?;
+ assert_eq!("t1.c0, t1.c1, t2.c0, t2.c1", join.to_string());
+ // test valid access
+ assert!(join.field_with_qualified_name("t1", "c0").is_ok());
+ assert!(join.field_with_qualified_name("t2", "c0").is_ok());
+ // test invalid access
+ assert!(join.field_with_unqualified_name("c0").is_err());
+ assert!(join.field_with_unqualified_name("t1.c0").is_err());
+ assert!(join.field_with_unqualified_name("t2.c0").is_err());
+ Ok(())
+ }
+
+ #[test]
+ fn join_qualified_duplicate() -> Result<()> {
+ let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
+ let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
+ let join = left.join(&right);
+ assert!(join.is_err());
+ assert_eq!(
+ "Error during planning: Schema contains duplicate \
+ qualified field name \'t1.c0\'",
+ &format!("{}", join.err().unwrap())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn join_unqualified_duplicate() -> Result<()> {
+ let left = DFSchema::try_from(test_schema_1())?;
+ let right = DFSchema::try_from(test_schema_1())?;
+ let join = left.join(&right);
+ assert!(join.is_err());
+ assert_eq!(
+ "Error during planning: Schema contains duplicate \
+ unqualified field name \'c0\'",
+ &format!("{}", join.err().unwrap())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn join_mixed() -> Result<()> {
+ let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
+ let right = DFSchema::try_from(test_schema_2())?;
+ let join = left.join(&right)?;
+ assert_eq!("t1.c0, t1.c1, c100, c101", join.to_string());
+ // test valid access
+ assert!(join.field_with_qualified_name("t1", "c0").is_ok());
+ assert!(join.field_with_unqualified_name("c0").is_ok());
+ assert!(join.field_with_unqualified_name("c100").is_ok());
+ assert!(join.field_with_name(None, "c100").is_ok());
+ // test invalid access
+ assert!(join.field_with_unqualified_name("t1.c0").is_err());
+ assert!(join.field_with_unqualified_name("t1.c100").is_err());
+ assert!(join.field_with_qualified_name("", "c100").is_err());
+ Ok(())
+ }
+
+ #[test]
+ fn join_mixed_duplicate() -> Result<()> {
+ let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
+ let right = DFSchema::try_from(test_schema_1())?;
+ let join = left.join(&right);
+ assert!(join.is_err());
+ assert_eq!(
+ "Error during planning: Schema contains qualified \
+ field name \'t1.c0\' and unqualified field name \'c0\' which would be ambiguous",
+ &format!("{}", join.err().unwrap())
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn helpful_error_messages() -> Result<()> {
+ let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
+ let expected_help = "Valid fields are \'t1.c0\', \'t1.c1\'.";
+ assert!(schema
+ .field_with_qualified_name("x", "y")
+ .unwrap_err()
+ .to_string()
+ .contains(expected_help));
+ assert!(schema
+ .field_with_unqualified_name("y")
+ .unwrap_err()
+ .to_string()
+ .contains(expected_help));
+ assert!(schema
+ .index_of("y")
+ .unwrap_err()
+ .to_string()
+ .contains(expected_help));
+ Ok(())
+ }
+
+ #[test]
+ fn into() {
+ // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
+ let arrow_schema = Schema::new(vec![Field::new("c0", DataType::Int64, true)]);
+ let arrow_schema_ref = Arc::new(arrow_schema.clone());
+
+ let df_schema =
+ DFSchema::new(vec![DFField::new(None, "c0", DataType::Int64, true)]).unwrap();
+ let df_schema_ref = Arc::new(df_schema.clone());
+
+ {
+ let arrow_schema = arrow_schema.clone();
+ let arrow_schema_ref = arrow_schema_ref.clone();
+
+ assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
+ assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
+ }
+
+ {
+ let arrow_schema = arrow_schema.clone();
+ let arrow_schema_ref = arrow_schema_ref.clone();
+
+ assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
+ assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
+ }
+
+ // Now, consume the refs
+ assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
+ assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
+ }
+
+ fn test_schema_1() -> Schema {
+ Schema::new(vec![
+ Field::new("c0", DataType::Boolean, true),
+ Field::new("c1", DataType::Boolean, true),
+ ])
+ }
+
+ fn test_schema_2() -> Schema {
+ Schema::new(vec![
+ Field::new("c100", DataType::Boolean, true),
+ Field::new("c101", DataType::Boolean, true),
+ ])
+ }
+}
diff --git a/datafusion-common/src/lib.rs b/datafusion-common/src/lib.rs
index ac8ef62..8090fc0 100644
--- a/datafusion-common/src/lib.rs
+++ b/datafusion-common/src/lib.rs
@@ -15,6 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+mod column;
+mod dfschema;
mod error;
+pub use column::Column;
+pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
pub use error::{DataFusionError, Result};