You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/05 17:30:49 UTC

[GitHub] [arrow] andygrove opened a new pull request #8839: ARROW-10732: [Rust] [DataFusion] Add SQL support for table/relation aliases and compound identifiers [WIP]

andygrove opened a new pull request #8839:
URL: https://github.com/apache/arrow/pull/8839


   The first commit adds DFSchema and DFField to support qualified field names in DataFusion.
   
   I am now working on updating the logical plan to use this new schema.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739448512


   Hey @andygrove . Thanks a lot for this!
   
   I would benefit from understanding the use-case for `DFSchema` at the physical plan. Note that this is primarily for my own understanding, as I am only familiar with qualifier names in SQL to disambiguate columns in expressions concerning more than one table - not in the representation of a statement at the logical and physical plan. Maybe you could give an example of where `arrow::Schema` is not sufficient at the physical level?
   
   My current understanding is that, without qualifiers, we can't write things like `(table1.a + 1) >= (table2.b - 1)`.
   
   What I am trying to understand is when do we need such an expression at the physical level. Typically, these plans require some form of join and are mapped to `filter(join(a, b))`, in which case I do not see how a qualifier is used: before the join there are two input nodes that are joined on a key (i.e. always an equality relationship between columns); after the join, there is a single node, and thus qualifiers are not needed.
   
   One use case case I see for this is when the join is itself over an expression, e.g. `JOIN ON (table1.a + 1) == (table2.b - 1)`. However, in this case, at the physical level, this can always be mapped to `join(projection())`. I.e. it seems to me that it is more of a convenience at building a logical statement than a necessity for executing such a statement.
   
   If the goal is that we can add the qualifier to the column name after the join, to desambiguate `table1.a` from `table2.a`, wouldn't it be easier to do that at the logical plan?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao edited a comment on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739448512


   Hey @andygrove . Thanks a lot for this!
   
   I would benefit from understanding the use-case for `DFSchema` at the physical plan. Note that this is primarily for my own understanding, as I am only familiar with qualifier names in SQL to disambiguate columns in expressions concerning more than one table - not in the representation of a statement at the logical and physical plan. Maybe you could give an example of where `arrow::Schema` is not sufficient at the physical level?
   
   My current understanding is that, without qualifiers, we can't write things like `(table1.a + 1) >= (table2.b - 1)`.
   
   What I am trying to understand is when do we need such an expression at the physical level. Typically, these plans require some form of join and are mapped to `filter(join(a, b))`, in which case I do not see how a qualifier is used: before the join there are two input nodes that are joined on a key (i.e. always an equality relationship between columns); after the join, there is a single node, and thus qualifiers are not needed.
   
   One use case case I see for this is when the join is itself over an expression, e.g. `JOIN ON (table1.a + 1) == (table2.b - 1)`. However, in this case, at the physical level, this can always be mapped to `join(projection())`. I.e. it seems to me that it is more of a convenience at building a logical statement than a necessity for executing such a statement.
   
   If the goal is that we can add the qualifier to the column name after the join, to desambiguate `table1.a` from `table2.a`, wouldn't it be easier to do that at the logical plan alone?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#discussion_r537789220



##########
File path: rust/datafusion/src/logical_plan/display.rs
##########
@@ -241,26 +246,43 @@ impl<'a, 'b> PlanVisitor for GraphvizVisitor<'a, 'b> {
 
 #[cfg(test)]
 mod tests {
-    use arrow::datatypes::{DataType, Field};
-
     use super::*;
+    use crate::error::Result;
+    use crate::logical_plan::DFField;
+    use arrow::datatypes::DataType;
 
     #[test]
-    fn test_display_empty_schema() {
-        let schema = Schema::new(vec![]);
+    fn test_display_empty_schema() -> Result<()> {
+        let schema = DFSchema::new(vec![])?;
         assert_eq!("[]", format!("{}", display_schema(&schema)));
+        Ok(())
     }
 
     #[test]
-    fn test_display_schema() {
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            Field::new("first_name", DataType::Utf8, true),
-        ]);
+    fn test_display_schema() -> Result<()> {
+        let schema = DFSchema::new(vec![
+            DFField::new(None, "id", DataType::Int32, false),
+            DFField::new(None, "first_name", DataType::Utf8, true),
+        ])?;
 
         assert_eq!(
             "[id:Int32, first_name:Utf8;N]",
             format!("{}", display_schema(&schema))
         );
+        Ok(())
+    }
+
+    #[test]
+    fn test_display_qualified_schema() -> Result<()> {

Review comment:
       👍 

##########
File path: rust/datafusion/src/physical_plan/mod.rs
##########
@@ -62,7 +62,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// downcast to a specific implementation.
     fn as_any(&self) -> &dyn Any;
     /// Get the schema for this execution plan
-    fn schema(&self) -> SchemaRef;
+    fn schema(&self) -> DFSchemaRef;

Review comment:
       When I was saying "physical plan doesn't use DFSchema` I guess I was imagining that `ExecutionPlan::schema()` continued to return `SchemaRef` -- there may be some reason that `ExecutionPlan` needs to return a DFSchema, but I think the design would be cleaner if we avoided this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove edited a comment on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-740176819


   Thanks for the continued reviews .... I think I misunderstood some of the earlier feedback. Also, I did run into a design issue when trying to leave the execution path using SchemaRef. I will see if I can find time this everning to explain this issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#discussion_r536986106



##########
File path: rust/datafusion/src/logical_plan/dfschema.rs
##########
@@ -0,0 +1,415 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DFSchema is an extended schema struct that DataFusion uses to provide support for
+//! fields with optional relation names.
+
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use std::fmt::{Display, Formatter};
+
+/// A reference-counted reference to a `DFSchema`.
+pub type DFSchemaRef = Arc<DFSchema>;
+
+/// DFSchema wraps an Arrow schema and adds relation names
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct DFSchema {
+    /// Fields
+    fields: Vec<DFField>,
+}
+
+impl DFSchema {
+    /// Creates an empty `DFSchema`
+    pub fn empty() -> Self {
+        Self { fields: vec![] }
+    }
+
+    /// Create a new `DFSchema`
+    pub fn new(fields: Vec<DFField>) -> Result<Self> {
+        let mut qualified_names: HashSet<(&str, &str)> = HashSet::new();
+        let mut unqualified_names: HashSet<&str> = HashSet::new();
+        for field in &fields {
+            if let Some(qualifier) = field.qualifier() {
+                if !qualified_names.insert((qualifier, field.name())) {
+                    return Err(DataFusionError::Plan(format!(
+                        "Joined schema would contain duplicate qualified field name '{}'",
+                        field.qualified_name()
+                    )));
+                }
+            } else {
+                if !unqualified_names.insert(field.name()) {
+                    return Err(DataFusionError::Plan(
+                        format!("Joined schema would contain duplicate unqualified field name '{}'",
+                                field.name())
+                    ));
+                }
+            }
+        }
+
+        // check for mix of qualified and unqualified field with same unqualified name
+        // note that we need to sort the contents of the HashSet first so that errors are
+        // deterministic
+        let mut qualified_names: Vec<&(&str, &str)> = qualified_names.iter().collect();
+        qualified_names.sort_by(|a, b| {
+            let a = format!("{}.{}", a.0, a.1);
+            let b = format!("{}.{}", b.0, b.1);
+            a.cmp(&b)
+        });
+        for (qualifier, name) in &qualified_names {
+            if unqualified_names.contains(name) {
+                return Err(DataFusionError::Plan(format!(
+                    "Joined schema would contain qualified field name '{}.{}' \
+                    and unqualified field name '{}' which would be ambiguous",
+                    qualifier, name, name
+                )));
+            }
+        }
+        Ok(Self { fields })
+    }
+
+    /// Create a `DFSchema` from an Arrow schema
+    pub fn from(schema: &Schema) -> Result<Self> {
+        Self::new(
+            schema
+                .fields()
+                .iter()
+                .map(|f| DFField {
+                    field: f.clone(),
+                    qualifier: None,
+                })
+                .collect(),
+        )
+    }
+
+    /// Create a `DFSchema` from an Arrow schema
+    pub fn from_qualified(qualifier: &str, schema: &Schema) -> Result<Self> {
+        Self::new(
+            schema
+                .fields()
+                .iter()
+                .map(|f| DFField {
+                    field: f.clone(),
+                    qualifier: Some(qualifier.to_owned()),
+                })
+                .collect(),
+        )
+    }
+
+    /// Combine two schemas
+    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
+        let mut fields = self.fields.clone();
+        fields.extend_from_slice(schema.fields().as_slice());
+        Self::new(fields)
+    }
+
+    /// Get a list of fields
+    pub fn fields(&self) -> &Vec<DFField> {
+        &self.fields
+    }
+
+    /// Returns an immutable reference of a specific `Field` instance selected using an
+    /// offset within the internal `fields` vector
+    pub fn field(&self, i: usize) -> &DFField {
+        &self.fields[i]
+    }
+
+    /// Find the index of the column with the given name
+    pub fn index_of(&self, name: &str) -> Result<usize> {
+        for i in 0..self.fields.len() {

Review comment:
       This could use Vec::position




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#discussion_r537600548



##########
File path: rust/datafusion/src/logical_plan/dfschema.rs
##########
@@ -136,12 +136,11 @@ impl DFSchema {
 
     /// Find the index of the column with the given name
     pub fn index_of(&self, name: &str) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            if self.fields[i].name() == name {
-                return Ok(i);
-            }
+        if let Some(i) = self.fields.iter().position(|f| f.name() == name) {
+            Ok(i)

Review comment:
       Thanks. Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#discussion_r536985161



##########
File path: rust/datafusion/src/logical_plan/dfschema.rs
##########
@@ -0,0 +1,415 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DFSchema is an extended schema struct that DataFusion uses to provide support for
+//! fields with optional relation names.
+
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use std::fmt::{Display, Formatter};
+
+/// A reference-counted reference to a `DFSchema`.
+pub type DFSchemaRef = Arc<DFSchema>;
+
+/// DFSchema wraps an Arrow schema and adds relation names
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct DFSchema {
+    /// Fields
+    fields: Vec<DFField>,
+}
+
+impl DFSchema {
+    /// Creates an empty `DFSchema`
+    pub fn empty() -> Self {
+        Self { fields: vec![] }

Review comment:
       Would it make sense to make this a hashset? Or convert to vec in last step.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#discussion_r537946735



##########
File path: rust/datafusion/src/logical_plan/expr.rs
##########
@@ -271,13 +274,14 @@ impl Expr {
     /// Returns the name of this expression based on [arrow::datatypes::Schema].
     ///
     /// This represents how a column with this expression is named when no alias is chosen
-    pub fn name(&self, input_schema: &Schema) -> Result<String> {
+    pub fn name(&self, input_schema: &DFSchema) -> Result<String> {
         create_name(self, input_schema)
     }
 
     /// Returns a [arrow::datatypes::Field] compatible with this expression.
-    pub fn to_field(&self, input_schema: &Schema) -> Result<Field> {
-        Ok(Field::new(
+    pub fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
+        Ok(DFField::new(
+            None, //TODO  qualifier

Review comment:
       oops, I actually forgot about that TODO.. thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#discussion_r537030760



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -214,7 +212,7 @@ impl ExecutionContext {
             has_header: options.has_header,
             delimiter: Some(options.delimiter),
             projection: None,
-            projected_schema: csv.schema(),
+            projected_schema: Arc::new(DFSchema::from(&csv.schema())),

Review comment:
       We could make this code look better if we implemented `impl Into<DFSchemaRef> for SchemaRef` -- so then we could write something like `projected_schema: csv.schema().into(),`
   
   Doing so in some follow on PR would be totally fine
   

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -408,7 +406,7 @@ impl ExecutionContext {
             let path = Path::new(&path).join(&filename);
             let file = fs::File::create(path)?;
             let mut writer =
-                ArrowWriter::try_new(file.try_clone().unwrap(), plan.schema(), None)?;
+                ArrowWriter::try_new(file.try_clone().unwrap(), plan.schema().to_arrow_schema(), None)?;

Review comment:
       We could likewise implement `impl Into<Schema> for DFSchema` and so call `into()` rather than `to_arrow_schema()`. This is again just a stylistic thing




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739422076


   This is great! I didn't see any strange things now, code looks clean and it sounds like this could be integrated and further tested.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739449143


   > Hi @jorgecarleitao did you get a chance to read the design document? There is a link to it from the JIRA.
   
   Yeah, I missed that one and the whole discussion on the issue: https://docs.google.com/document/d/1BFo7ruJayCulAHLa9-noaHXbgcaAH_4LuOJFGJnDHkc/edit#heading=h.su3u27lcpr3l , sorry about that.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-740172359


   Ah and now I see, like so often, @jorgecarleitao  has beat me to the comment and has more thorough comments as well 👍 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove edited a comment on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739978964


   @alamb @jorgecarleitao @Dandandan This is ready for re-review.
   
   To recap:
   
   - At execution time we always* use the DataFusion schema from the plan now rather than the Arrow schema from the record batch
   - When converting the DataFusion schema to an Arrow schema for use in record batches, we use the fully qualified field names
   
   * _It is possible that there may still be one or two places where we are still using the batch schema but I think it will be easier to find those in the follow-up PRs where we add support for referencing columns by qualified names_


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-740176819


   Thanks for the continued reviews .... I think I misunderstand some of the earlier feedback. Also, I did run into a design issue when trying to leave the execution path using SchemaRef. I will see if I can find time this everning to explain this issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739528704


   @jorgecarleitao @alamb I've been looking at the question of whether the physical plan should use DFSchema. Here is the current (in master) implementation of the physical expression for Column:
   
   ```rust
   impl PhysicalExpr for Column {
       /// Get the data type of this expression, given the schema of the input
       fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
           Ok(input_schema
               .field_with_name(&self.name)?
               .data_type()
               .clone())
       }
   
       /// Decide whehter this expression is nullable, given the schema of the input
       fn nullable(&self, input_schema: &Schema) -> Result<bool> {
           Ok(input_schema.field_with_name(&self.name)?.is_nullable())
       }
   
       /// Evaluate the expression
       fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
           Ok(ColumnarValue::Array(
               batch.column(batch.schema().index_of(&self.name)?).clone(),
           ))
       }
   }
   ```
   
   As you can see, the `data_type` and `nullable` use the schema from the plan whereas the `evaluate` method uses the schema from the record batch, which is a little inconsistent. They should probably all use the same schema.
   
   The bigger issue though is that this expression is looking up columns by name, so how do we support qualified names here? I see the following choices:
   
   1. Have `ExecutionPlan.schema()` use `DFSchema` as I have done in this PR
   2. Use qualified names in the Arrow schema field name e.g. "t1.foo"
   3. Change the Column physical expression to refer to columns by index rather than name
   
   Maybe there are other options that I am not seeing?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Implement DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739412569


   @alamb @jorgecarleitao @Dandandan fyi


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao edited a comment on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739448512


   Hey @andygrove . Thanks a lot for this!
   
   I would benefit from understanding the use-case for `DFSchema` at the physical plan. Note that this is primarily for my own understanding, as I am only familiar with qualifier names in SQL to disambiguate columns in expressions concerning more than one table - not in the representation of a statement at the physical plan. Maybe you could give an example of where `arrow::Schema` is not sufficient at the physical level?
   
   My current understanding is that, without qualifiers, we can't write things like `(table1.a + 1) >= (table2.b - 1)`.
   
   What I am trying to understand is when do we need such an expression at the physical level. Typically, these plans require some form of join and are mapped to `filter(join(a, b))`, in which case I do not see how a qualifier is used: before the join there are two input nodes that are joined on a key (i.e. always an equality relationship between columns); after the join, there is a single node, and thus qualifiers are not needed.
   
   One use case case I see for this is when the join is itself over an expression, e.g. `JOIN ON (table1.a + 1) == (table2.b - 1)`. However, in this case, at the physical level, this can always be mapped to `join(projection())`. I.e. it seems to me that it is more of a convenience at building a logical statement than a necessity for executing such a statement.
   
   If the goal is that we can add the qualifier to the column name after the join, to desambiguate `table1.a` from `table2.a`, wouldn't it be easier to do that at the logical plan alone?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Add SQL support for table/relation aliases and compound identifiers [WIP]

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739325065


   https://issues.apache.org/jira/browse/ARROW-10732


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739532745


   Thanks a lot for looking at this. All excellent points. I now see that this is tricky :)
   
   Thinking about what you wrote, if we plan the Logical as `t1.a, t2.a`, wouldn't the column names become `a, a` on the `RecordBatch`? i.e. there will be a discrepancy between the schema provided by `df.schema()` and the `RecordBatches::schema()` returned by `collect()`, no?
   
   I think that this will happen even if we pass `DFSchema` to the physical plan (1.) or use indexes (3.), as any map `qualified name -> unqualified` is lossy (the qualifier), and thus never recoverable at the `RecordBatch`'s schema.
   
   This IMO leaves us with 2., which is what I would try: change the physical planner to alias/rewrite column names with the qualifier when the physical plan is created. This will cause the resulting `RecordBatch`'s schema to have columns named `t1.a` and `t2.a`, thereby guaranteeing the invariant that the output schema of the physical execution matches the schema of the logical plan.
   
   I.e. The invariant that `SELECT t1.a, t2.a, c ...` yields a schema whose columns are named `["t1.a", "t2.a", "c"]` is preserved. 
   
   Note that we already do this when performing coercion: we preserve the logical schema name by injecting cast ops during physical (and not logical) planning, so that if the user wrote `SELECT sqrt(f32) ...`, the resulting name on the `RecordBatch::schema()` is `sqrt(f32)`, even if the physical operation performed was `sqrt(CAST(f32 as Float64))`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739978964


   @alamb @jorgecarleitao @Dandandan This is ready for re-review.
   
   To recap:
   
   - At execution time we always* use the DataFusion schema from the plan now rather than the Arrow schema from the record batch
   - When converting the DataFusion schema to an Arrow schema for use in record batches, we use the fully qualified field names
   
   - The 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#discussion_r537945244



##########
File path: rust/datafusion/src/logical_plan/expr.rs
##########
@@ -271,13 +274,14 @@ impl Expr {
     /// Returns the name of this expression based on [arrow::datatypes::Schema].
     ///
     /// This represents how a column with this expression is named when no alias is chosen
-    pub fn name(&self, input_schema: &Schema) -> Result<String> {
+    pub fn name(&self, input_schema: &DFSchema) -> Result<String> {
         create_name(self, input_schema)
     }
 
     /// Returns a [arrow::datatypes::Field] compatible with this expression.
-    pub fn to_field(&self, input_schema: &Schema) -> Result<Field> {
-        Ok(Field::new(
+    pub fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
+        Ok(DFField::new(
+            None, //TODO  qualifier

Review comment:
       might be worth a ticket to track this work -- it would be a good initial project for someone to contribute maybe




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#discussion_r537592747



##########
File path: rust/datafusion/src/logical_plan/dfschema.rs
##########
@@ -136,12 +136,11 @@ impl DFSchema {
 
     /// Find the index of the column with the given name
     pub fn index_of(&self, name: &str) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            if self.fields[i].name() == name {
-                return Ok(i);
-            }
+        if let Some(i) = self.fields.iter().position(|f| f.name() == name) {
+            Ok(i)

Review comment:
       small style thing but I guess we could do it roughly like this instead? 
   
   ```rust
       self.fields.iter().position(|f| f.name() == name)
           .ok_or_else(|| DataFusionError::Plan(format!("No field named '{}'", name)))
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739449229


   You may have a point about only needing this at the logical level. I am not sure, but I will take a look at this tomorrow.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739544637


   Thanks @jorgecarleitao I think that makes a lot of sense. Unfortunately I am running into some issues implementing this due to the physical planner calling into the logical planner to create names and it is getting hard to mix and match these schemas.
   
   I am going to have to take a step back and break this down into smaller steps I think.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-740266863


   @jorgecarleitao @alamb I now see where I got carried away with this :smile: .. this PR now updates 16 files instead of 41 and does not change the phyical plans.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb closed pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #8839:
URL: https://github.com/apache/arrow/pull/8839


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#discussion_r537102128



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -408,7 +406,7 @@ impl ExecutionContext {
             let path = Path::new(&path).join(&filename);
             let file = fs::File::create(path)?;
             let mut writer =
-                ArrowWriter::try_new(file.try_clone().unwrap(), plan.schema(), None)?;
+                ArrowWriter::try_new(file.try_clone().unwrap(), plan.schema().to_arrow_schema(), None)?;

Review comment:
       I've broken this out into a separate PR https://github.com/apache/arrow/pull/8857




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739448886


   Hi @jorgecarleitao did you get a chance to read the design document? There is a link to it from the JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739934509


   > As you can see, the data_type and nullable use the schema from the plan whereas the evaluate method uses the schema from the record batch, which is a little inconsistent. They should probably all use the same schema.
   
   I agree -- I recommend using the schema from the plan for consistency.
   
   > This IMO leaves us with 2., which is what I would try: change the physical planner to alias/rewrite column names with the qualifier when the physical plan is created. This will cause the resulting RecordBatch's schema to have columns named t1.a and t2.a, thereby guaranteeing the invariant that the output schema of the physical execution matches the schema of the logical plan.
   
   
   I agree with this recommendation -- I would recommend when moving from logical --> physical plan, that we always use the fully qualified name of the field, which would avoid ambiguity. If we don't like `t1.foo` being sprinkled around in plans that only have one table or where the column names aren't ambiguous, we could implement a (logical plan) optimizer pass to remove unneeded qualifiers. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739968404


   Thanks for the feedback. I will try and get this rebased today.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove edited a comment on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739978964






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-740148943


   I went carefully through this. As I understand this PR, the reason we pass `DFSchema` into the `ExecutionPlan` is that we need to pass it to `PhysicalExpr.evaluate`, so that we can use `field_with_unqualified_name` on the `ColumnExpr`. 95% of the changes on the PR are derived from this change.
   
   IMO this introduces complexity on the physical execution that makes it more difficult to understand and use.
   
   IMO the signature `evaluate(&BatchRecord, &DFSchema)` indicates a design issue, as the recordBatch has all information required to be evaluated by `PhysicalExpr`.
   
   IMO we may be able to avoid this complexity by using `field_with_unqualified_name` on the physical planner, to create a `Schema` that is passed to the `ExecutionPlan` with the fields re-written, and creating `ColumnExpr` using the qualifier names.
   
   Specifically, the suggestion is to have the physical planner convert `DFSchema -> Schema` by writing `DFField` `(qual, name)` to `Field` `"qual.name"`, and, respectively, pass `"qual.name"` to `ColumnExpr`. IMO this would allow to keep all physical planning as it is in master, and IMO would make it easier to understand the physical execution and how the logical plan is being converted to the physical execution.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#discussion_r536986106



##########
File path: rust/datafusion/src/logical_plan/dfschema.rs
##########
@@ -0,0 +1,415 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DFSchema is an extended schema struct that DataFusion uses to provide support for
+//! fields with optional relation names.
+
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use std::fmt::{Display, Formatter};
+
+/// A reference-counted reference to a `DFSchema`.
+pub type DFSchemaRef = Arc<DFSchema>;
+
+/// DFSchema wraps an Arrow schema and adds relation names
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct DFSchema {
+    /// Fields
+    fields: Vec<DFField>,
+}
+
+impl DFSchema {
+    /// Creates an empty `DFSchema`
+    pub fn empty() -> Self {
+        Self { fields: vec![] }
+    }
+
+    /// Create a new `DFSchema`
+    pub fn new(fields: Vec<DFField>) -> Result<Self> {
+        let mut qualified_names: HashSet<(&str, &str)> = HashSet::new();
+        let mut unqualified_names: HashSet<&str> = HashSet::new();
+        for field in &fields {
+            if let Some(qualifier) = field.qualifier() {
+                if !qualified_names.insert((qualifier, field.name())) {
+                    return Err(DataFusionError::Plan(format!(
+                        "Joined schema would contain duplicate qualified field name '{}'",
+                        field.qualified_name()
+                    )));
+                }
+            } else {
+                if !unqualified_names.insert(field.name()) {
+                    return Err(DataFusionError::Plan(
+                        format!("Joined schema would contain duplicate unqualified field name '{}'",
+                                field.name())
+                    ));
+                }
+            }
+        }
+
+        // check for mix of qualified and unqualified field with same unqualified name
+        // note that we need to sort the contents of the HashSet first so that errors are
+        // deterministic
+        let mut qualified_names: Vec<&(&str, &str)> = qualified_names.iter().collect();
+        qualified_names.sort_by(|a, b| {
+            let a = format!("{}.{}", a.0, a.1);
+            let b = format!("{}.{}", b.0, b.1);
+            a.cmp(&b)
+        });
+        for (qualifier, name) in &qualified_names {
+            if unqualified_names.contains(name) {
+                return Err(DataFusionError::Plan(format!(
+                    "Joined schema would contain qualified field name '{}.{}' \
+                    and unqualified field name '{}' which would be ambiguous",
+                    qualifier, name, name
+                )));
+            }
+        }
+        Ok(Self { fields })
+    }
+
+    /// Create a `DFSchema` from an Arrow schema
+    pub fn from(schema: &Schema) -> Result<Self> {
+        Self::new(
+            schema
+                .fields()
+                .iter()
+                .map(|f| DFField {
+                    field: f.clone(),
+                    qualifier: None,
+                })
+                .collect(),
+        )
+    }
+
+    /// Create a `DFSchema` from an Arrow schema
+    pub fn from_qualified(qualifier: &str, schema: &Schema) -> Result<Self> {
+        Self::new(
+            schema
+                .fields()
+                .iter()
+                .map(|f| DFField {
+                    field: f.clone(),
+                    qualifier: Some(qualifier.to_owned()),
+                })
+                .collect(),
+        )
+    }
+
+    /// Combine two schemas
+    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
+        let mut fields = self.fields.clone();
+        fields.extend_from_slice(schema.fields().as_slice());
+        Self::new(fields)
+    }
+
+    /// Get a list of fields
+    pub fn fields(&self) -> &Vec<DFField> {
+        &self.fields
+    }
+
+    /// Returns an immutable reference of a specific `Field` instance selected using an
+    /// offset within the internal `fields` vector
+    pub fn field(&self, i: usize) -> &DFField {
+        &self.fields[i]
+    }
+
+    /// Find the index of the column with the given name
+    pub fn index_of(&self, name: &str) -> Result<usize> {
+        for i in 0..self.fields.len() {

Review comment:
       This could use Vec::find




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb edited a comment on pull request #8839: ARROW-10732: [Rust] [DataFusion] Integrate DFSchema as a step towards supporting qualified column names

Posted by GitBox <gi...@apache.org>.
alamb edited a comment on pull request #8839:
URL: https://github.com/apache/arrow/pull/8839#issuecomment-739934509


   > As you can see, the data_type and nullable use the schema from the plan whereas the evaluate method uses the schema from the record batch, which is a little inconsistent. They should probably all use the same schema.
   
   I agree -- I recommend using the schema from the plan for consistency.
   
   > This IMO leaves us with 2., which is what I would try: change the physical planner to alias/rewrite column names with the qualifier when the physical plan is created. This will cause the resulting RecordBatch's schema to have columns named t1.a and t2.a, thereby guaranteeing the invariant that the output schema of the physical execution matches the schema of the logical plan.
   
   
   I agree with @jorgecarleitao 's recommendation -- I would recommend when moving from logical --> physical plan, that we always use the fully qualified name of the field, which would avoid ambiguity. If we don't like `t1.foo` being sprinkled around in plans that only have one table or where the column names aren't ambiguous, we could implement a (logical plan) optimizer pass to remove unneeded qualifiers. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org