You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "stuartcarnie (via GitHub)" <gi...@apache.org> on 2023/04/12 05:12:44 UTC

[GitHub] [arrow-datafusion] stuartcarnie opened a new issue, #5970: UNION ALL with ORDER BY results are inconsistent

stuartcarnie opened a new issue, #5970:
URL: https://github.com/apache/arrow-datafusion/issues/5970

   ### Describe the bug
   
   ### DataFusion CLI (18.0)
   
   Results were consistently sorted correctly
   
   ```sql
   WITH
     m0(time, tag0, f64) AS (VALUES (1667181600000000000, 'val00', 10.1), (1667181610000000000, 'val00', 21.2), (1667181620000000000, 'val00', 11.2), (1667181630000000000, 'val00', 19.2), (1667181600000000000, 'val01', 11.3), (1667181600000000000, 'val02', 10.4), (1667181610000000000, 'val00', 18.9)),
   
     m1(time, tag0, f64) AS (VALUES (1667181600000000000, 'val00', 100.5), (1667181610000000000, 'val00', 200.6), (1667181600000000000, 'val01', 101.7)),
   
     t AS (
       SELECT 'm0' as "iox::measurement", tag0, 0::timestamp as time, COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY 1, 2, 3), 
   
     u AS (
       SELECT 'm1' as "iox::measurement", tag0, 0::timestamp as time, COUNT(f64), SUM(f64), stddev(f64) FROM m1 GROUP BY 1, 2, 3)
   SELECT * FROM t
   UNION ALL
   SELECT * FROM u
   ORDER BY 1, 2, 3;
   ```
   
   Output is correct:
   
   ```text
   +------------------+-------+---------------------+------------+----------+-------------------+
   | iox::measurement | tag0  | time                | COUNT(f64) | SUM(f64) | STDDEV(f64)       |
   +------------------+-------+---------------------+------------+----------+-------------------+
   | m0               | val00 | 1970-01-01T00:00:00 | 5          | 80.6     | 5.085961069453836 |
   | m0               | val01 | 1970-01-01T00:00:00 | 1          | 11.3     |                   |
   | m0               | val02 | 1970-01-01T00:00:00 | 1          | 10.4     |                   |
   | m1               | val00 | 1970-01-01T00:00:00 | 2          | 301.1    | 70.7813887967734  |
   | m1               | val01 | 1970-01-01T00:00:00 | 1          | 101.7    |                   |
   +------------------+-------+---------------------+------------+----------+-------------------+
   ```
   
   
   ### datafusion-cli (20.0)
   
   Results were not consistent
   
   
   
   Output is sometimes incorrect correct:
   
   
   ```text
   +------------------+-------+---------------------+------------+----------+-------------------+
   | iox::measurement | tag0  | time                | COUNT(f64) | SUM(f64) | STDDEV(f64)       |
   +------------------+-------+---------------------+------------+----------+-------------------+
   | m0               | val00 | 1970-01-01T00:00:00 | 5          | 80.6     | 5.085961069453836 |
   | m0               | val01 | 1970-01-01T00:00:00 | 1          | 11.3     |                   |
   | m1               | val00 | 1970-01-01T00:00:00 | 2          | 301.1    | 70.7813887967734  |
   | m0               | val02 | 1970-01-01T00:00:00 | 1          | 10.4     |                   |
   | m1               | val01 | 1970-01-01T00:00:00 | 1          | 101.7    |                   |
   +------------------+-------+---------------------+------------+----------+-------------------+
   ```
   
   ### To Reproduce
   
   _No response_
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on issue #5970: UNION ALL with ORDER BY results are inconsistent

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on issue #5970:
URL: https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1508262463

   My suggestion is making the partition-aware optimization a configure option, but do not remove the code. 
   By default, we turn it off. So that the UnionExec just like a plain Union and do not keep the partition info.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb closed issue #5970: UNION ALL with ORDER BY results are inconsistent

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb closed issue #5970: UNION ALL with ORDER BY results are inconsistent
URL: https://github.com/apache/arrow-datafusion/issues/5970


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] stuartcarnie commented on issue #5970: UNION ALL with ORDER BY results are inconsistent

Posted by "stuartcarnie (via GitHub)" <gi...@apache.org>.
stuartcarnie commented on issue #5970:
URL: https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1507602460

   > > I would argue that UnionExec should NEVER modify its inputs but just be a plain, simple node that forwards its inputs w/o messing up sorting (or any other property).
   > 
   > I agree with this sentiment -- we already have `RepartitionExec` that concatenates batches from different streams
   
   Does it mean that the `UnionExec` will return the inputs sequentially (i.e. concatenated) or will it potentially interleave the inputs, whilst maintaining the ordering?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on issue #5970: UNION ALL with ORDER BY results are inconsistent

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on issue #5970:
URL: https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1508272242

   > My suggestion is making the partition-aware optimization a configure option, but do not remove the code.
   > By default, we turn it off. So that the UnionExec just like a plain Union and do not keep the partition info.
   
   But that also means that if a user turns it on, DataFusion may produce incorrect results. I'm not sure this is a desired feature then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on issue #5970: UNION ALL with ORDER BY results are inconsistent

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on issue #5970:
URL: https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1508165252

   > > > I would argue that UnionExec should NEVER modify its inputs but just be a plain, simple node that forwards its inputs w/o messing up sorting (or any other property).
   > > 
   > > 
   > > I agree with this sentiment -- we already have `RepartitionExec` that concatenates batches from different streams
   > 
   > Does it mean that the `UnionExec` will return the inputs sequentially (i.e. concatenated) or will it potentially interleave the inputs, whilst maintaining the ordering?
   
   So what it SHOULD do (IMHO) and what it also does in the majority of the cases (i.e. when the `partition_aware = false`):
   
   ```yaml
   ---
   # Plan:
   UnionExec:
     children:
       - SomeChild:
           output_partitions:
             - [batch111, batch112]
             - [batch121, batch122]
       - SomeChild:
           output_partitions:
             - [batch211, batch212]
             - [batch221, batch222]
   
   ---
   # Equivalent pseudo-plan
   UnionExec:
     output_partitions:
       - [batch111, batch112]
       - [batch121, batch122]
       - [batch211, batch212]
       - [batch221, batch222]
   ```
   
   However what it actually does with `partition_aware = true`:
   
   ```yaml
   ---
   UnionExec:
     output_partitions:
       - CombinedRecordBatchStream:
         - [batch111, batch112]
         - [batch211, batch212]
       - CombinedRecordBatchStream:
         - [batch121, batch122]
         - [batch221, batch222]
   
   ---
   # May yield (if lucky):
   UnionExec:
     output_partitions:
       - [batch111, batch112, batch211, batch212]
       - [batch121, batch122, batch221, batch222]
   
   ---
   # May also yield (if not so lucky):
   UnionExec:
     output_partitions:
       - [batch111, batch211, batch112, batch212]
       - [batch221, batch222, batch121, batch122]
   ```
   
   The exact logical of `CombinedRecordBatchStream ` can be found here:
   
   https://github.com/apache/arrow-datafusion/blob/fcd8b899e2a62f798413c536f47078289ece9d05/datafusion/core/src/physical_plan/union.rs#L364-L408
   
   This shuffling obviously confuses the `SortPreservingMergeExec` logic because it assumes that the inputs are sorted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on issue #5970: UNION ALL with ORDER BY results are inconsistent

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on issue #5970:
URL: https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1507189056

   So the plan is seemingly right but actual wrong:
   
   The two `SortExec`s in the above plan have two partitions each (= 4 in total). They are [hash-partitioned](https://github.com/apache/arrow-datafusion/blob/fcd8b899e2a62f798413c536f47078289ece9d05/datafusion/core/src/physical_plan/mod.rs#L471-L473) because this is what you get from the group-by operation.
   
   The `UnionExec` thinks it is "partition aware" (whatever that means):
   
   https://github.com/apache/arrow-datafusion/blob/fcd8b899e2a62f798413c536f47078289ece9d05/datafusion/core/src/physical_plan/union.rs#L157-L168
   
   As a consequence, it only emits 2 output partitions (instead of 4) and concats the input data:
   
   https://github.com/apache/arrow-datafusion/blob/fcd8b899e2a62f798413c536f47078289ece9d05/datafusion/core/src/physical_plan/union.rs#L276-L292
   
   Hence, it no longer preserve sorting. However the `EnforceSorting` optimizer pass thinks it can push `SortExec` safely through `UnionExec`:
   
   ```text
   [2023-04-13T15:26:06Z TRACE datafusion::physical_plan::planner] Optimized physical plan by EnforceDistribution:
       SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
         SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
           UnionExec
             ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]
               AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]
                 RepartitionExec: partitioning=Hash([Column { name: "Int64(0)", index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2
                   AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]
                     RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
                       ProjectionExec: expr=[column1@0 as t]
                         ValuesExec
             ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]
               AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]
                 RepartitionExec: partitioning=Hash([Column { name: "Int64(1)", index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2
                   AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]
                     RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
                       ProjectionExec: expr=[column1@0 as t]
                         ValuesExec
   
   
   [2023-04-13T15:26:06Z TRACE datafusion::physical_plan::planner] Optimized physical plan by EnforceSorting:
       SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
         UnionExec
           SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
             ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]
               AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]
                 RepartitionExec: partitioning=Hash([Column { name: "Int64(0)", index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2
                   AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]
                     RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
                       ProjectionExec: expr=[column1@0 as t]
                         ValuesExec
           SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
             ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]
               AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]
                 RepartitionExec: partitioning=Hash([Column { name: "Int64(1)", index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2
                   AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]
                     RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
                       ProjectionExec: expr=[column1@0 as t]
                         ValuesExec
   ```
   
   Now we could either blame `EnforceSorting` for performing the wrong optimization or `UnionExec` for having some life on its own while seemingly being a really simple node (who would have expected that it concats input batches from different input partitions?!). I would argue that `UnionExec` should NEVER modify its inputs but just be a plain, simple node that forwards its inputs w/o messing up sorting (or any other property).
   
   CC @mustafasrepo who authored #5661 (= making `EnforceSorting` smarter) and @mingmwang who taught `UnionExec` to be "partition aware" in #4043.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on issue #5970: UNION ALL with ORDER BY results are inconsistent

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on issue #5970:
URL: https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1506882436

   I couldn't get the datafusion-cli to reproduce, but I think it largely depends on the plan and the target partitions / CPU count. So here's a code test that reproduces the issue on my system as well:
   
   
   ```rust
   #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   async fn test_issue5970_mini() -> Result<()> {
       let config = SessionConfig::new().with_target_partitions(2).with_repartition_sorts(true);
       let ctx = SessionContext::with_config(config);
       let sql = "
   WITH
       m0(t) AS (
           VALUES (0), (1), (2)),
   
       m1(t) AS (
           VALUES (0), (1)),
   
       u AS (
           SELECT 0 as m, t FROM m0 GROUP BY 1, 2), 
   
       v AS (
           SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
   SELECT * FROM u
   UNION ALL
   SELECT * FROM v
   ORDER BY 1, 2;
       ";
   
       // check phys. plan
       let dataframe = ctx.sql(sql).await.unwrap();
       let plan = dataframe.into_optimized_plan().unwrap();
       let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
       let expected = vec![
           "SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
           "  UnionExec",
           "    SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
           "      ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]",
           "        AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]",
           "          CoalesceBatchesExec: target_batch_size=8192",
           "            RepartitionExec: partitioning=Hash([Column { name: \"Int64(0)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), input_partitions=2",
           "              AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
           "                RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
           "                  ProjectionExec: expr=[column1@0 as t]",
           "                    ValuesExec",
           "    SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
           "      ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
           "        AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]",
           "          CoalesceBatchesExec: target_batch_size=8192",
           "            RepartitionExec: partitioning=Hash([Column { name: \"Int64(1)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), input_partitions=2",
           "              AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
           "                RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
           "                  ProjectionExec: expr=[column1@0 as t]",
           "                    ValuesExec",
       ];
       let formatted = displayable(plan.as_ref()).indent().to_string();
       let actual: Vec<&str> = formatted.trim().lines().collect();
       assert_eq!(
           expected, actual,
           "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
       );
   
       // sometimes it "just works"
       for i in 0..10 {
           println!("run: {i}");
           let actual = execute_to_batches(&ctx, sql).await;
           let expected = vec![
               "+---+---+",
               "| m | t |",
               "+---+---+",
               "| 0 | 0 |",
               "| 0 | 1 |",
               "| 0 | 2 |",
               "| 1 | 0 |",
               "| 1 | 1 |",
               "+---+---+",
           ];
           assert_batches_eq!(expected, &actual);
       }
       Ok(())
   }
   ```
   
   results in:
   
   ```text
   expected:
   
   [
       "+---+---+",
       "| m | t |",
       "+---+---+",
       "| 0 | 0 |",
       "| 0 | 1 |",
       "| 0 | 2 |",
       "| 1 | 0 |",
       "| 1 | 1 |",
       "+---+---+",
   ]
   actual:
   
   [
       "+---+---+",
       "| m | t |",
       "+---+---+",
       "| 0 | 0 |",
       "| 0 | 2 |",
       "| 1 | 0 |",
       "| 0 | 1 |",
       "| 1 | 1 |",
       "+---+---+",
   ]
   ```
   
   To me, the phys. plan looks fine. I guess either `SortExec` or `SortPreservingMergeExec` are buggy. The issue was bisected to #5661 which makes use of `SortPreservingMergeExec` so my current hypothesis is that this is the buggy one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on issue #5970: UNION ALL with ORDER BY results are inconsistent

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on issue #5970:
URL: https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1506764270

   I'm working on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #5970: UNION ALL with ORDER BY results are inconsistent

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #5970:
URL: https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1507210583

   > I would argue that UnionExec should NEVER modify its inputs but just be a plain, simple node that forwards its inputs w/o messing up sorting (or any other property).
   
   I agree with this sentiment -- we already have `RepartitionExec` that concatenates batches from different streams


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org