You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "berkaysynnada (via GitHub)" <gi...@apache.org> on 2023/05/30 19:12:08 UTC

[GitHub] [arrow-datafusion] berkaysynnada opened a new issue, #6492: Mismatch in MemTable (Select Into with no alias)

berkaysynnada opened a new issue, #6492:
URL: https://github.com/apache/arrow-datafusion/issues/6492

   ### Describe the bug
   
   `SELECT SUM(c1) OVER(ORDER BY c1) as sum1 INTO new_table FROM annotated_data_infinite` 
   has no problem but
   `SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM annotated_data_infinite` 
   gives an error: **Plan("Mismatch between schema and batches")**. 
   
   The reason is that in `MemTable::try_new()`, the schema and partitions' schema don't match. I have tracked the reason and saw that the schema, which is created from the input LogicalPlan, has fields whose names are the result of [display_name()](https://github.com/apache/arrow-datafusion/blob/c7bfe15b4940ebff39f466212ccf32e891db7243/datafusion/expr/src/expr_schema.rs#L283) (It writes the whole expression, func + window specs). However, the RecordBatch's fields of partitions are the result of [physical_name()](https://github.com/apache/arrow-datafusion/blob/c7bfe15b4940ebff39f466212ccf32e891db7243/datafusion/core/src/physical_plan/planner.rs#L1601), in case of no alias. (It writes only the function part of the expr). 
   
   
   ### To Reproduce
   
   SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM annotated_data_infinite
   
   ### Expected behavior
   
   I have 2 solution approaches:
   
   1) `create_window_expr()` gives the name with `display_name()` while constructing the window expr. However, there needs to be many changes in tests, and the exec lines will become too long like:
   `ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]`
   Maybe Display's can be changed?
   
   2) `contains()` function implemented for `Schema` is used in try_new() of MemTable to match fields, and it uses `contains()` implemented for `Field`. It checks one-to-one equalities of all elements in the Field struct. Just for the name element, we can reduce the equality to something like `schema_field.name().starts_with(partition_field.name())`. If we do not prefer changing `contains()` function, maybe we can write some specialized function like
   ```
   fn validate_partitions_wth_schema(schema: &SchemaRef, partitions: &Vec<Vec<RecordBatch>>) -> bool {
       if !partitions.iter().flatten().all(|p| p.schema().fields().len() == schema.fields().len())
       { return false; }
       for partition in partitions.iter().flatten() {
           for (schema_field, partition_field) in schema.fields().iter().zip(partition.schema().fields().iter())
           {
               if !schema_field.name().starts_with(partition_field.name()) || 
                   schema_field.data_type() != partition_field.data_type()
               { return false; }
           }
       }
       true
   }
   ```
   But this approach also does not seem solid to me.
   
   ### Additional context
   
   Any advice is welcomed. I will solve the issue when we reach a common ground.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] byteink commented on issue #6492: Mismatch in MemTable (Select Into with aggregate window functions having no alias)

Posted by "byteink (via GitHub)" <gi...@apache.org>.
byteink commented on issue #6492:
URL: https://github.com/apache/arrow-datafusion/issues/6492#issuecomment-1573076672

   Another similar case of failure:
   ```shell
   DataFusion CLI v25.0.0
   ❯ create table t (a int not null);
   0 rows in set. Query took 0.004 seconds.
   
   ❯ insert into t values(1);
   Error during planning: Inserting query must have the same schema with the table.
   ```
   Two non-matching schemas are:
   **input schema**: Schema { fields: [Field { name: "a", data_type: Int32, nullable: **true**, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }
   **table schema**: Schema { fields: [Field { name: "a", data_type: Int32, nullable: **false**, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }
     
       
        
   Can we ignore the schema part of batches and only focus on the actual data part?
   And use the function `RecordBatch::try_new` to check if the data in the RecordBatch matches the schema of the target table. 
   ```rust
   impl MemTable {
       /// Create a new in-memory table from the provided schema and record batches
       pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
           let mut batches = Vec::with_capacity(partitions.len());
           for partition in partitions {
               let new_partition = partition
                   .iter()
                   .map(|batch| {
                       RecordBatch::try_new(schema.clone(), batch.columns().to_vec())
                           .map_err(DataFusionError::ArrowError)
                   })
                   .collect::<Result<Vec<_>>>()?;
               batches.push(Arc::new(RwLock::new(new_partition)));
           }
           Ok(Self { schema, batches })
       }
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb closed issue #6492: Mismatch in MemTable (Select Into with aggregate window functions having no alias)

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb closed issue #6492: Mismatch in MemTable (Select Into with aggregate window functions having no alias)
URL: https://github.com/apache/arrow-datafusion/issues/6492


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on issue #6492: Mismatch in MemTable (Select Into with aggregate window functions having no alias)

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on issue #6492:
URL: https://github.com/apache/arrow-datafusion/issues/6492#issuecomment-1576357068

   In `create_memory_table()` function, there is a match arm that handles the case which the table does not exist. In that case, we initialize the MemTable with `try_new()`, comparing the fields one-to-one. For these not registered and newly created tables, do we need to check LogicalPlan schema and the schema coming from partitions? By implementing a `MemTable::new()` function, can we directly adopt the schema coming from partitions? (no need to return Result also after excluding schema check in `try_new()`)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] byteink commented on issue #6492: Mismatch in MemTable (Select Into with aggregate window functions having no alias)

Posted by "byteink (via GitHub)" <gi...@apache.org>.
byteink commented on issue #6492:
URL: https://github.com/apache/arrow-datafusion/issues/6492#issuecomment-1576183654

   I think there may be two issues that need to be considered.
   
   # 1. What is the schema of a table created with initial contents (particularly the behavior of nullability)
   ## MySQL
   Query expressions and column definitions can appear together in the create table statement, and MySQL will check if there is a conflict between the two.
   ```
   mysql> create table t1(a int not null) as select 1 as a;
   Query OK, 1 row affected (0.06 sec)
   Records: 1  Duplicates: 0  Warnings: 0
   
   mysql> describe t1;
   +-------+------+------+-----+---------+-------+
   | Field | Type | Null | Key | Default | Extra |
   +-------+------+------+-----+---------+-------+
   | a     | int  | NO   |     | NULL    |       |
   +-------+------+------+-----+---------+-------+
   1 row in set (0.00 sec)
   
   
   mysql> create table t2(a int not null) as select null as a;
   ERROR 1048 (23000): Column 'a' cannot be null
   
   ```
   
   
   ## PostgreSQL
   `SELECT INTO` and `CREATE TABLE AS` statements do not support specifying column constraints.
   It seems that the schema of the table created through these statements is always nullable.
   ```
   postgres => create table t1(a int not null);
   CREATE TABLE
   postgres => create table t2 as select * from t1;
   SELECT 0
   postgres => \d t2
                    Table "public.t2"
    Column |  Type   | Collation | Nullable | Default
   --------+---------+-----------+----------+---------
    a      | integer |           |          |
   
   postgres=> select * into t3 from t1;
   SELECT 0
   postgres=> \d t3
                    Table "public.t3"
    Column |  Type   | Collation | Nullable | Default
   --------+---------+-----------+----------+---------
    a      | integer |           |          |
   ```
   
   
   ## Datafusion
   > Now that we an add new data to MemTables, what do you think about ensuring the schema of the MemTable when created via INSERT ... is nullable?
   
   I think it is ok, just like PostgreSQL.
   
   But we should reject the following statement:
   ```
   DataFusion CLI v25.0.0
   ❯ create table t(a int not null) as select null as a;
   0 rows in set. Query took 0.005 seconds.
   ❯ select * from t;
   +---+
   | a |
   +---+
   |   |
   +---+
   1 row in set. Query took 0.005 seconds.
   ```
   The user specified that the column should not be nullable, but this specification did not take effect. 
   
   # 2. What kind of data can be inserted into MemoryTable
   I think using the comparison of whether the two schemas are the same may be too strict.
   Declaring the schema as nullable doesn't necessarily mean that the actual data is always NULL.
   
   The following example can run successfully in PostgreSQL, but not in Datafusion.
   ```
   postgres=> create table foo(a int, b int);
   CREATE TABLE
   postgres=> insert into foo values(1,10), (2, NULL);
   INSERT 0 2
   postgres=> create table bar(b int not null);
   CREATE TABLE
   
   postgres=> insert into bar select b from foo where a=1;
   INSERT 0 1
   postgres=> insert into bar select b from foo where a=2;
   ERROR:  null value in column "b" of relation "bar" violates not-null constraint
   DETAIL:  Failing row contains (null).
   ```
   We should verify the resulting data after executing the input's execution plan.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on issue #6492: Mismatch in MemTable (Select Into with aggregate window functions having no alias)

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on issue #6492:
URL: https://github.com/apache/arrow-datafusion/issues/6492#issuecomment-1576553280

   > In `create_memory_table()` function, there is a match arm that handles the case which the table does not exist. In that case, we initialize the MemTable with `try_new()`, comparing the fields one-to-one. For these not registered and newly created tables, do we need to check LogicalPlan schema and the schema coming from partitions? By implementing a `MemTable::new_not_registered()` function, can we directly adopt the schema coming from partitions? In case of empty batches (Create Table statements without values inserted), we can use input plan's schema.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #6492: Mismatch in MemTable (Select Into with aggregate window functions having no alias)

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #6492:
URL: https://github.com/apache/arrow-datafusion/issues/6492#issuecomment-1573999728

   I think the proper solution is to update the planner code to project (e.g. with a `LogicalPlan::Projection`) the output of the query into the types of the target table 
   
   In the example https://github.com/apache/arrow-datafusion/issues/6492#issuecomment-1573076672 from @byteink  the table's schema says "it can't have nulls" (because it is marked as non-nullable) so in my opinion DataFusion is correctly rejecting this plan
   
   I think this was not previously a problem because MemoryTables could not be updated so if the initial contents didn't contain nulls, they never would contain nulls (and thus that could be reflected in the schema)
   
   Now that we an add new data to MemTables, what do you think about ensuring the schema of the MemTable when created via `INSERT ...` is nullable?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] berkaysynnada commented on issue #6492: Mismatch in MemTable (Select Into with aggregate window functions having no alias)

Posted by "berkaysynnada (via GitHub)" <gi...@apache.org>.
berkaysynnada commented on issue #6492:
URL: https://github.com/apache/arrow-datafusion/issues/6492#issuecomment-1570343563

   @alamb, when you have some time, could you please review my suggestions? I wonder about your thoughts on how to solve this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #6492: Mismatch in MemTable (Select Into with aggregate window functions having no alias)

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #6492:
URL: https://github.com/apache/arrow-datafusion/issues/6492#issuecomment-1572539079

   Hi @berkaysynnada  -- 
   
   Here is a small reproducer:
   ```sql
   ❯ create table foo (x int) as values (1);
   0 rows in set. Query took 0.001 seconds.
   ❯ explain select first_value(x) OVER () INTO bar from foo ;
   +--------------+------------------------------------------------------------------------------------------------------------+
   | plan_type    | plan                                                                                                       |
   +--------------+------------------------------------------------------------------------------------------------------------+
   | logical_plan | CreateMemoryTable: Bare { table: "bar" }                                                                   |
   |              |   Projection: FIRST_VALUE(foo.x) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING                  |
   |              |     WindowAggr: windowExpr=[[FIRST_VALUE(foo.x) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] |
   |              |       TableScan: foo projection=[x]                                                                        |
   +--------------+------------------------------------------------------------------------------------------------------------+
   1 row in set. Query took 0.001 seconds.
   ❯  select first_value(x) OVER () INTO bar from foo ;
   Error during planning: Mismatch between schema and batches
   ```
   
   
   I wonder if you can solve this with an explicit alias rather than changing how the display names work.
   
   So when the `CreateMemoryTable` is planned, add explicit aliases (maybe select "first_value(x) OVER () INTO bar from foo" ;
   ```
   +--------------+------------------------------------------------------------------------------------------------------------+
   | plan_type    | plan                                                                                                       |
   +--------------+------------------------------------------------------------------------------------------------------------+
   | logical_plan | CreateMemoryTable: Bare { table: "bar" }                                                                   |
   |              |   Projection: FIRST_VALUE(foo.x) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING                  
    AS "first_value(x)"   <---- add this alias??|
   |              |     WindowAggr: windowExpr=[[FIRST_VALUE(foo.x) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] |
   |              |       TableScan: foo projection=[x]                                                                        |
   +--------------+------------------------------------------------------------------------------------------------------------+
   ```
   
   For the record, here is what postgres does:
   
   ```shell
   postgres=# select first_value(x) OVER () INTO bar from foo ;
   SELECT 1
   postgres=# select * from bar;
    first_value
   -------------
              1
   (1 row)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org