You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/07/23 12:47:00 UTC
[jira] [Closed] (FLINK-7593) Generated plan does not create correct
groups
[ https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fabian Hueske closed FLINK-7593.
--------------------------------
Resolution: Duplicate
After taking a closer look I noticed that this is a duplicate of FLINK-9031.
> Generated plan does not create correct groups
> ---------------------------------------------
>
> Key: FLINK-7593
> URL: https://issues.apache.org/jira/browse/FLINK-7593
> Project: Flink
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 1.3.2
> Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2
> Reporter: Steffen Dienst
> Priority: Critical
> Attachments: flink-good-plan.json
>
>
> Under specific circumstances Flink seems to generate an execution plan that is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files contained multiple entries per group, the grouping did not occur. After some work I managed to reduce the relevant part of our code to the minimal test case below. Be careful: All parts need to be present, even the irrelevant secondary output. If I remove anything else Flink generates correct code (either by introducing a combiner node prior to the reducer or by using "Sum (combine))" an the edge before the reducer.
> {code:java}
> import java.util.ArrayList;
> import java.util.Collection;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.types.LongValue;
> import org.apache.flink.util.LongValueSequenceIterator;
> public class FlinkOptimizerBug {
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>
> DataSet<Tuple2<Long, Long>> x =
> env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))
> .join(env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)))
> .where(0).equalTo(0).with((t1,t2) -> t1)
> .union(env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class)
> .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L)))
> .map(l->l)
> .withForwardedFields("f0;f1");
>
> Collection out = new ArrayList();
> x.output(new LocalCollectionOutputFormat<>(out ));
>
> x.groupBy(0)
> .sum(1) //BUG: this will not be grouped correctly, so there will be multiple outputs per group!
> .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE)
> .setParallelism(1);
> env.setParallelism(4);
>
> System.out.println(env.getExecutionPlan());
> env.execute();
> }
> }
> {code}
> Invalid execution plan generated:
> {code:javascript}
> {
> "nodes": [
> {
> "id": 5,
> "type": "source",
> "pact": "Data Source",
> "contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
> "parallelism": "4",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "0.0" },
> { "name": "Disk I/O", "value": "0.0" },
> { "name": "CPU", "value": "0.0" },
> { "name": "Cumulative Network", "value": "0.0" },
> { "name": "Cumulative Disk I/O", "value": "0.0" },
> { "name": "Cumulative CPU", "value": "0.0" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> },
> {
> "id": 4,
> "type": "pact",
> "pact": "Map",
> "contents": "Map at main(FlinkOptimizerBug.java:24)",
> "parallelism": "4",
> "predecessors": [
> {"id": 5, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
> ],
> "driver_strategy": "Map",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "0.0" },
> { "name": "Disk I/O", "value": "0.0" },
> { "name": "CPU", "value": "0.0" },
> { "name": "Cumulative Network", "value": "0.0" },
> { "name": "Cumulative Disk I/O", "value": "0.0" },
> { "name": "Cumulative CPU", "value": "0.0" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> },
> {
> "id": 7,
> "type": "source",
> "pact": "Data Source",
> "contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
> "parallelism": "4",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "0.0" },
> { "name": "Disk I/O", "value": "0.0" },
> { "name": "CPU", "value": "0.0" },
> { "name": "Cumulative Network", "value": "0.0" },
> { "name": "Cumulative Disk I/O", "value": "0.0" },
> { "name": "Cumulative CPU", "value": "0.0" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> },
> {
> "id": 6,
> "type": "pact",
> "pact": "Map",
> "contents": "Map at main(FlinkOptimizerBug.java:26)",
> "parallelism": "4",
> "predecessors": [
> {"id": 7, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
> ],
> "driver_strategy": "Map",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "0.0" },
> { "name": "Disk I/O", "value": "0.0" },
> { "name": "CPU", "value": "0.0" },
> { "name": "Cumulative Network", "value": "0.0" },
> { "name": "Cumulative Disk I/O", "value": "0.0" },
> { "name": "Cumulative CPU", "value": "0.0" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> },
> {
> "id": 3,
> "type": "pact",
> "pact": "Join",
> "contents": "Join at main(FlinkOptimizerBug.java:27)",
> "parallelism": "4",
> "predecessors": [
> {"id": 4, "side": "first", "ship_strategy": "Hash Partition on [0]", "exchange_mode": "PIPELINED"},
> {"id": 6, "side": "second", "ship_strategy": "Hash Partition on [0]", "exchange_mode": "PIPELINED"}
> ],
> "driver_strategy": "Hybrid Hash (build: Map at main(FlinkOptimizerBug.java:24) (id: 4))",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "(unknown)" },
> { "name": "Disk I/O", "value": "(unknown)" },
> { "name": "CPU", "value": "(unknown)" },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> },
> {
> "id": 9,
> "type": "source",
> "pact": "Data Source",
> "contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
> "parallelism": "4",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "0.0" },
> { "name": "Disk I/O", "value": "0.0" },
> { "name": "CPU", "value": "0.0" },
> { "name": "Cumulative Network", "value": "0.0" },
> { "name": "Cumulative Disk I/O", "value": "0.0" },
> { "name": "Cumulative CPU", "value": "0.0" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> },
> {
> "id": 8,
> "type": "pact",
> "pact": "Map",
> "contents": "Map at main(FlinkOptimizerBug.java:29)",
> "parallelism": "4",
> "predecessors": [
> {"id": 9, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
> ],
> "driver_strategy": "Map",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "0.0" },
> { "name": "Disk I/O", "value": "0.0" },
> { "name": "CPU", "value": "0.0" },
> { "name": "Cumulative Network", "value": "0.0" },
> { "name": "Cumulative Disk I/O", "value": "0.0" },
> { "name": "Cumulative CPU", "value": "0.0" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> },
> {
> "id": 2,
> "type": "pact",
> "pact": "Union",
> "contents": "",
> "parallelism": "4",
> "predecessors": [
> {"id": 3, "side": "first", "ship_strategy": "Forward", "exchange_mode": "PIPELINED"},
> {"id": 8, "side": "second", "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
> ],
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "0.0" },
> { "name": "Disk I/O", "value": "0.0" },
> { "name": "CPU", "value": "0.0" },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> },
> {
> "id": 1,
> "type": "pact",
> "pact": "Map",
> "contents": "Map at main(FlinkOptimizerBug.java:30)",
> "parallelism": "4",
> "predecessors": [
> {"id": 2, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort on [0:ASC]", "exchange_mode": "PIPELINED"}
> ],
> "driver_strategy": "Map",
> "global_properties": [
> { "name": "Partitioning", "value": "HASH_PARTITIONED" },
> { "name": "Partitioned on", "value": "[0]" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "[0:ASC]" },
> { "name": "Grouped on", "value": "[0]" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "(unknown)" },
> { "name": "Disk I/O", "value": "(unknown)" },
> { "name": "CPU", "value": "(unknown)" },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> },
> {
> "id": 0,
> "type": "sink",
> "pact": "Data Sink",
> "contents": "org.apache.flink.api.java.io.LocalCollectionOutputFormat@52feb982",
> "parallelism": "4",
> "predecessors": [
> {"id": 1, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
> ],
> "global_properties": [
> { "name": "Partitioning", "value": "HASH_PARTITIONED" },
> { "name": "Partitioned on", "value": "[0]" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "[0:ASC]" },
> { "name": "Grouped on", "value": "[0]" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "0.0" },
> { "name": "Disk I/O", "value": "0.0" },
> { "name": "CPU", "value": "0.0" },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> },
> {
> "id": 11,
> "type": "pact",
> "pact": "GroupReduce",
> "contents": "SUM(1), at main(FlinkOptimizerBug.java:35",
> "parallelism": "4",
> "predecessors": [
> {"id": 1, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
> ],
> "driver_strategy": "Sorted Group Reduce",
> "global_properties": [
> { "name": "Partitioning", "value": "HASH_PARTITIONED" },
> { "name": "Partitioned on", "value": "[0]" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "[0:ASC]" },
> { "name": "Grouped on", "value": "[0]" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "0.0" },
> { "name": "Disk I/O", "value": "0.0" },
> { "name": "CPU", "value": "0.0" },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> },
> {
> "id": 10,
> "type": "sink",
> "pact": "Data Sink",
> "contents": "CsvOutputFormat (path: /tmp/foo, delimiter: ,)",
> "parallelism": "1",
> "predecessors": [
> {"id": 11, "ship_strategy": "Redistribute", "exchange_mode": "PIPELINED"}
> ],
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" } ],
> "costs": [
> { "name": "Network", "value": "(unknown)" },
> { "name": "Disk I/O", "value": "0.0" },
> { "name": "CPU", "value": "0.0" },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
> { "name": "Filter Factor", "value": "(none)" } ]
> }
> ]
> }
> {code}
> Output in CSV file:
> {noformat}
> 0,69
> 1,63
> 2,58
> 3,61
> 0,68
> 1,65
> 2,48
> 3,55290
> 0,58
> 1,64
> 2,67885
> 3,61
> 0,61031
> 1,66673
> 2,64
> 3,67
> {noformat}
> Expected output (for example when removing the unnecessary collection output from the code):
> {noformat}
> 0,58096
> 1,70450
> 3,66549
> 2,56882
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)