You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "berkaysynnada (via GitHub)" <gi...@apache.org> on 2023/06/06 14:50:52 UTC

[GitHub] [arrow-datafusion] berkaysynnada opened a new pull request, #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

berkaysynnada opened a new pull request, #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #6492.
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   When writing query result to MemTable using SELECT .. INTO syntax, window aggregate projections without alias give error because of schema mismatch. As an example,
   
   `SELECT SUM(c1) OVER(ORDER BY c1) as sum1 INTO new_table FROM annotated_data_infinite`
   has no problem but
   `SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM annotated_data_infinite`
   gives an error: 
   **Plan("Mismatch between schema and batches")**.
   
   This is because of the schema, which is created from the input LogicalPlan, has fields whose names are the result of [display_name()](https://github.com/apache/arrow-datafusion/blob/c7bfe15b4940ebff39f466212ccf32e891db7243/datafusion/expr/src/expr_schema.rs#L283) (It writes the whole expression, func + window specs). However, the RecordBatch's fields of partitions are the result of [physical_name()](https://github.com/apache/arrow-datafusion/blob/c7bfe15b4940ebff39f466212ccf32e891db7243/datafusion/core/src/physical_plan/planner.rs#L1601). (It writes only the function part of the expr).
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   In `create_memory_table()` function, there is a match arm that handles the case which the table does not exist. In that case, we initialize the MemTable with try_new(), comparing the fields one-to-one. For these not registered and newly created tables, we don't need to check LogicalPlan schema and the schema coming from partitions. By implementing a MemTable::new_not_registered() function, we can directly adopt the schema coming from partitions. In case of empty batches (Create Table statements without values inserted), we can use input plan's schema.
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   Yes, the erroneous example above is tested.
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1237655925


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   > @alamb, @comphead: What do you think? Should we move forward with this approach?
   
   Yes, that is what I think we should do. If the overly verbose column (at the output) names are a problem, perhaps we can look into updating the planner to automatically add more reasonable aliases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#issuecomment-1580178810

   > We touched this window problem here [#5695 (comment)](https://github.com/apache/arrow-datafusion/issues/5695#issuecomment-1480248977)
   > 
   > so to fix the issue completely we need to realias correctly instead of shorten the name in physical plan
   > 
   > https://github.com/apache/arrow-datafusion/blob/26e1b20ea3362ea62cb713004a0636b8af6a16d7/datafusion/core/src/physical_plan/planner.rs#L1630
   
   In planner, modifying the lines
   ```
       let (name, e) = match e {
           Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
           _ => (physical_name(e)?, e),
       };
   ```
   to that
   ```
       let (name, e) = match e {
           Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
           _ => (e.canonical_name(), e), 
       };
   ```
   solves my issue. Is this what you mean by realiasing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1221143198


##########
datafusion/core/src/execution/context.rs:
##########
@@ -518,7 +518,7 @@ impl SessionContext {
                 let physical = DataFrame::new(self.state(), input);
 
                 let batches: Vec<_> = physical.collect_partitioned().await?;
-                let table = Arc::new(MemTable::try_new(schema, batches)?);
+                let table = Arc::new(MemTable::new_not_registered(schema, batches));

Review Comment:
   Yes, my first suggestion is to use longer version of the name in planner (what is done in the LogicalPlan). However, there need to be many changes in tests, and the BoundedWindowAggExec lines may become too long. If it is not a problem, we can solve the issue like [that](https://github.com/apache/arrow-datafusion/pull/6566#issuecomment-1580178810)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1237677392


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   It should also be considered that we cannot support more than one column with the same alias, if we intend to shorten the names at the output by making realias.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1240749333


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   Related issues: https://github.com/apache/arrow-datafusion/issues/6543 and https://github.com/apache/arrow-datafusion/issues/6758



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] comphead commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "comphead (via GitHub)" <gi...@apache.org>.
comphead commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1237115792


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   @berkaysynnada thanks for checking that.
   
   I was also working on that.
   Changing `display_name` was the one I started with but in this case other scenarios will fail. When window plan created the DFS schema check name uniqueness from `display_name` not considering aliases. So this query will fail
   ```
   SELECT
     first_value(c9) OVER (PARTITION BY c2 ORDER BY c9) first_c9,
     first_value(c9) OVER (PARTITION BY c2 ORDER BY c9 DESC) first_c9_desc
   FROM aggregate_test_100
   ```
   
   I'm still thinking how to overcome that without breaking changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1237601207


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   After exploring all options, I get the impression that this option that @alamb mentioned before:
   
   > Update `BoundedWindowAggExec` to produce the field names declared by the `WindowAggExec`
   
   is the most consistent path forward. TBC, my understanding of this is to make sure execs use names that are compatible with the plan. This will tend to produce overly verbose column names, but all the other options seems to suffer from some kind of a problem. AFAICT this approach is similar to what @berkaysynnada mentioned as [his first solution alternative in the issue description](https://github.com/apache/arrow-datafusion/issues/6492) as well.
   
   @alamb, @comphead: What do you think? Should we move forward with this approach?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#issuecomment-1618127291

   We have observed that the mismatch problem is not only related to SELECT INTO's. Table creation with CREATE TABLE AS has the same problem while matching the schemas. 
   
   With my last commits, I have changed how the window expressions are named to be used in window executors. Now, window function columns of the batches have the same name as the plans.
   
   .slt and unit tests are edited accordingly. We now have longer plans and column names, but the problem is solved. To make progress, it seems better to apply this PR and open another issue that shortens the window names.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1237677392


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   It should also be considered that we cannot support more than one column with the same alias, if we intend to shorten the names at the output by realiasing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#issuecomment-1619201022

   @comphead, I agree that it is better to get correct behavior first in this PR and fix the bug, and make name shortening the subject of a follow-on PR.
   
   @alamb, does this look good to you? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1221355361


##########
datafusion/core/src/execution/context.rs:
##########
@@ -518,7 +518,7 @@ impl SessionContext {
                 let physical = DataFrame::new(self.state(), input);
 
                 let batches: Vec<_> = physical.collect_partitioned().await?;
-                let table = Arc::new(MemTable::try_new(schema, batches)?);
+                let table = Arc::new(MemTable::new_not_registered(schema, batches));

Review Comment:
   I think using the same names in physical and logical plans is preferable because the rest of the parts of the code expects this and sometimes makes assumptions that it is the case (because it mostly is). 
   
   If we don't make the logical and physical plans match up, I predict we will continue to hit a long tail of bugs related to schema mismatches, only when using window functions related to the discrepancy.
   
   If the long display name is a problem (and I can see how it would be) perhaps we can figure out how to make `display_name` produce something shorter for window functions other than serializing the entire window definition
   
   Here is what postgres does:
   
   ```sql
   postgres=# select first_value(x) over (order by x) from foo;
    first_value
   -------------
              1
   (1 row)
   ```
   
   We probably need to do something more sophisticated as DataFusion needs distinct column names. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#issuecomment-1599414571

   Marking as draft as we work through feedback


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#issuecomment-1580432529

   > solves my issue. Is this what you mean by realiasing?
   
   Yes, that is what I was referring to


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1220199844


##########
datafusion/core/src/execution/context.rs:
##########
@@ -518,7 +518,7 @@ impl SessionContext {
                 let physical = DataFrame::new(self.state(), input);
 
                 let batches: Vec<_> = physical.collect_partitioned().await?;
-                let table = Arc::new(MemTable::try_new(schema, batches)?);
+                let table = Arc::new(MemTable::new_not_registered(schema, batches));

Review Comment:
   It seems like the core problem here is `schema` (aka the schema of `input`) does not actually match the schema of the batches that are produced. 
   
   As I think @comphead is suggesting in https://github.com/apache/arrow-datafusion/pull/6566#pullrequestreview-1465439929, this PR seems to be trying to workaround the deeper problem of the window exec producing an output schema (different column names) than the LogicalPlan says. 
   
   Have you looked into making the names consistent?
   



##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -73,6 +75,26 @@ impl MemTable {
         }
     }
 
+    /// Create a new in-memory table from the record batches.
+    /// In case of empty table, the schema is inferred from the input plan.
+    pub fn new_not_registered(

Review Comment:
   I think this code effectively ignores the `schema` argument if any of `partitions` has a RecordBatch and uses that RecordBatch's schema instead. 
   
   I think this is a surprising behavior and might mask errors in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1225302032


##########
datafusion/sql/src/select.rs:
##########
@@ -555,3 +565,288 @@ fn match_window_definitions(
     }
     Ok(())
 }
+
+fn create_function_physical_name(
+    fun: &str,
+    distinct: bool,
+    args: &[Expr],
+) -> Result<String> {
+    let names: Vec<String> = args
+        .iter()
+        .map(|e| create_physical_name(e, false))
+        .collect::<Result<_>>()?;
+
+    let distinct_str = match distinct {
+        true => "DISTINCT ",
+        false => "",
+    };
+    Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
+}
+
+fn physical_name(e: &Expr) -> Result<String> {
+    create_physical_name(e, true)
+}
+
+fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
+    match e {
+        Expr::Column(c) => {
+            if is_first_expr {
+                Ok(c.name.clone())
+            } else {
+                Ok(c.flat_name())
+            }
+        }
+        Expr::Alias(_, name) => Ok(name.clone()),
+        Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")),
+        Expr::Literal(value) => Ok(format!("{value:?}")),
+        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
+            let left = create_physical_name(left, false)?;
+            let right = create_physical_name(right, false)?;
+            Ok(format!("{left} {op} {right}"))
+        }
+        Expr::Case(case) => {

Review Comment:
   For example, this appears to the same code as https://github.com/apache/arrow-datafusion/blob/1af846bd8de387ce7a6e61a2008917a7610b9a7b/datafusion/physical-expr/src/expressions/case.rs#L66-L77
   
   If we ever changed the code in phsical-expr and did not change this code, would that cause problems?



##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   I am sorry if my past comments have been confusing. Here is what I was trying to say earlier in https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1221355361:
   
   I ran this command to get some logs (with the extra debug in PR https://github.com/apache/arrow-datafusion/pull/6626):
   
   ```shell
   RUST_LOG=debug cargo test --test sqllogictests -- ddl 2>&1 | tee /tmp/debug.log
   ```
   
   Here is the content of debug.log:  [debug.log](https://github.com/apache/arrow-datafusion/files/11712340/debug.log)
   
   
   
   From the log, here is the `LogialPlan` that shows the  `WindowAggr` declares it makes a column named `SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW` (yes that whole thing!)
   
   ```
       Projection: SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, test_table.c2, test_table.c3
         WindowAggr: windowExpr=[[SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
           TableScan: test_table projection=[c1, c2, c3]
   ```
   
   
   Here is the final `ExecutionPlan`, also showing the same giant column as the declared output name:
   
   ```
   [2023-06-10T11:43:29Z DEBUG datafusion::physical_plan::planner] Optimized physical plan:
       ProjectionExec: expr=[SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(test_table.c1), c2@1 as c2, c3@2 as c3]
         BoundedWindowAggExec: wdw=[SUM(test_table.c1): Ok(Field { name: "SUM(test_table.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Float32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
           SortPreservingMergeExec: [c2@1 ASC NULLS LAST]
             SortExec: expr=[c2@1 ASC NULLS LAST]
               MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
   ```
   
   However, looking at the logs, what the execution plan actually produces a column named `SUM(test_table.c1)`:
   
   ```
   [2023-06-10T11:43:29Z DEBUG datafusion::datasource::memory] mem schema does not contain batches schema.
   
   Target_schema: Schema { fields: [
     Field { name: "SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
     Field { name: "c2", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
     Field { name: "c3", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {}
   }.
   
   
   Batches Schema: Schema { fields: [
     Field { name: "SUM(test_table.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
     Field { name: "c2", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
     Field { name: "c3", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {}
   }
   ```
   
   Thus, what I was trying to say earlier is that I think the root of the problem is the mismatch between what the plans say the field name of the output is and what the field name that the WindowExec is actually producing.
   
   So I think we should fix this bug by resolving the mismatch. Either:
   1. Update the Logical/Physical plans so the field names of WindowAgg matches what the `BoundedWindowAggExec` actually produces
   2. OR Update `BoundedWindowAggExec` to produce the field names declared by the `WindowAggExec
   
   



##########
datafusion/sql/src/select.rs:
##########
@@ -555,3 +565,288 @@ fn match_window_definitions(
     }
     Ok(())
 }
+
+fn create_function_physical_name(

Review Comment:
   Does this functionality need to remain in sync with the creation of physical names? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] comphead commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "comphead (via GitHub)" <gi...@apache.org>.
comphead commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1225593824


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   Thanks @alamb @berkaysynnada 
   Would you mind if I also open a small PR for this? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1236971325


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   Sorry for the late reply. I have tried both suggestions @alamb:
   1) We need to modify 3 parts of the code: 
   a- In the `project()` function in select.rs, the final schema will be constructed with a shortened form of the window function. 
   ```
       fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
           match self {
               Expr::Column(c) => Ok(DFField::new(
                   c.relation.clone(),
                   &c.name,
                   self.get_type(input_schema)?,
                   self.nullable(input_schema)?,
               )),
               _ => {
                   Ok(DFField::new_unqualified(
                       &self.display_name()?,
                       self.get_type(input_schema)?,
                       self.nullable(input_schema)?,
                   ))
               }
           }
       }
   ```
   is expanded with that arm:
                   
   ```
             Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
                   Ok(DFField::new_unqualified(
                       &vec![create_function_name(&fun.to_string(), false, args)?].join(" "),
                       self.get_type(input_schema)?,
                       self.nullable(input_schema)?,
                   ))
               }
   ```
   b- In the `project()` function again, qualified wildcard columns are normalized. However, the column name is in the longer form, and the schema of the plan is in the shorter form. Therefore, we also change the `expr_as_column_expr()` function so that window function expressions are converted to column expressions with the shortened column name format, which can be copied from the schema.
   c- `PushDownProjection` rule again creates new column expressions with `display_name() `function (which returns the long format) in the window handling arm. These column names also need to be shortened to satisfy subsequent control.
   
   **My opinion:** Can we directly change the `display_name()` function for window functions such that only function name and arguments are returned? Thus we don't need to change any of what I mentioned above.
   ```
           Expr::WindowFunction(WindowFunction {
               fun, args, window_frame, partition_by, order_by,
           }) => {
               let mut parts: Vec<String> =
                   vec![create_function_name(&fun.to_string(), false, args)?];
               if !partition_by.is_empty() {
                   parts.push(format!("PARTITION BY {partition_by:?}"));
               }
               if !order_by.is_empty() {
                   parts.push(format!("ORDER BY {order_by:?}"));
               }
               parts.push(format!("{window_frame}"));
               Ok(parts.join(" "))
           }
   ```
   new version:
   ```
   Expr::WindowFunction(WindowFunction {
               fun, args, ..
           }) => {
               let mut parts: Vec<String> =
                   vec![create_function_name(&fun.to_string(), false, args)?];
               Ok(parts.join(" "))
           }
   ```
   
   
   2) We can only change `create_window_expr()` such that it creates the window name with `display_name()` rather than `physical_name()`, but it makes the plans longer, also lots of test change burden.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#issuecomment-1582488017

   I have updated the code and took out the old stuff. When the final window plan is built, we realias the aggregate window expressions. It is worked for SELECT INTO case. I'd like to know what you think about these changes. Thank you.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1225834931


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   > Would you mind if I also open a small PR for this?
   
   I would be very much appreciative, personally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1237601207


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   After exploring all options, I get the impression that this option that @alamb mentioned before:
   
   > Update `BoundedWindowAggExec` to produce the field names declared by the `WindowAggExec`
   
   is the most consistent path forward. It tends to produce overly verbose column names, but all the other options seems to suffer from some kind of a problem. AFAICT this approach is similar to what @berkaysynnada mentioned as [his first solution alternative in the issue description](https://github.com/apache/arrow-datafusion/issues/6492) as well.
   
   @alamb, @comphead: What do you think? Should we move forward with this approach?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1236971325


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   Sorry for the late reply. I have tried both suggestions @alamb:
   1) We need to modify 3 parts of the code: 
   a- In the `project()` function in select.rs, the final schema will be constructed with a shortened form of the window function. 
   ```
       fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
           match self {
               Expr::Column(c) => Ok(DFField::new(
                   c.relation.clone(),
                   &c.name,
                   self.get_type(input_schema)?,
                   self.nullable(input_schema)?,
               )),
               _ => {
                   Ok(DFField::new_unqualified(
                       &self.display_name()?,
                       self.get_type(input_schema)?,
                       self.nullable(input_schema)?,
                   ))
               }
           }
       }
   ```
   is expanded with that arm:
                   
   ```
             Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
                   Ok(DFField::new_unqualified(
                       &vec![create_function_name(&fun.to_string(), false, args)?].join(" "),
                       self.get_type(input_schema)?,
                       self.nullable(input_schema)?,
                   ))
               }
   ```
   b- In the `project()` function again, qualified wildcard columns are normalized. However, the column name is in the longer form, and the schema of the plan is in the shorter form. Therefore, we also change the `expr_as_column_expr()` function so that window function expressions are converted to column expressions with the shortened column name format, which can be copied from the schema.
   c- `PushDownProjection` rule again creates new column expressions with `display_name() `function (which returns the long format) in the window handling arm. These column names also need to be shortened to satisfy subsequent control.
   
   **My opinion:** Can we directly change the `display_name()` function for window functions such that only function name and arguments are returned? Thus we don't need to change any of what I mentioned above.
   ```
           Expr::WindowFunction(WindowFunction {
               fun, args, window_frame, partition_by, order_by,
           }) => {
               let mut parts: Vec<String> =
                   vec![create_function_name(&fun.to_string(), false, args)?];
               if !partition_by.is_empty() {
                   parts.push(format!("PARTITION BY {partition_by:?}"));
               }
               if !order_by.is_empty() {
                   parts.push(format!("ORDER BY {order_by:?}"));
               }
               parts.push(format!("{window_frame}"));
               Ok(parts.join(" "))
           }
   ```
   new version:
   ```
   Expr::WindowFunction(WindowFunction {
               fun, args, ..
           }) => {
               let mut parts: Vec<String> =
                   vec![create_function_name(&fun.to_string(), false, args)?];
               Ok(parts.join(" "))
           }
   ```
   
   
   2) We can only change `create_window_expr()` such that it creates the window name with `display_name()` rather than `physical_name()`, but it makes the plans longer, also lots of test change burden.
   
   I would like to wrap up this PR and any thoughts you have would be really helpful. Can you review the alternatives above when you get a chance? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1237677392


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   It should also be considered that we cannot support more than one column with the same aliases, if we intend to shorten the names at the output by making realias.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] comphead commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "comphead (via GitHub)" <gi...@apache.org>.
comphead commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1237666200


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   Agree, we need to get back on column naming convention as currently long names are not user friendly and not useful without aliases in nested queries



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1237678899


##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .iter()
                 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
                 .collect::<Result<Vec<Expr>>>()?;
-
+            if select.into.is_some() {
+                for expr in select_exprs_post_aggr.iter_mut() {
+                    if let Expr::Column(_) = expr.clone() {
+                        *expr = expr.clone().alias(physical_name(expr)?);
+                    }
+                }
+            }

Review Comment:
   Sounds good. We will go forward with that approach and @berkaysynnada will update you guys of the progress.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] comphead commented on pull request #6566: Mismatch in MemTable of Select Into when projecting on aggregate window functions

Posted by "comphead (via GitHub)" <gi...@apache.org>.
comphead commented on PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#issuecomment-1618727650

   > We have observed that the mismatch problem is not only related to SELECT INTO's. Table creation with CREATE TABLE AS has the same problem while matching the schemas.
   > 
   > With my last commits, I have changed how the window expressions are named to be used in window executors. Now, window function columns of the batches have the same name as the plans.
   > 
   > .slt and unit tests are edited accordingly. We now have longer plans and column names, but the problem is solved. To make progress, I think applying this PR and opening another issue that shortens the window names would work better.
   
   Thanks @berkaysynnada I will check this today. You right, the problem is related to create mem table and can be reproduced with 
   ```
   let sql = "create table t as SELECT SUM(c1) OVER(ORDER BY c1) FROM (select 1 c1)"
   ```
   
   To make it window aliases shorten this is good idea, I can create a followup issue. I was looking into this for last couple of days and there is mess in `select.rs` for window processing, actually leading DFSchema to fail on shortened names even if they aliased 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org