You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Maximilian Michels (JIRA)" <ji...@apache.org> on 2018/10/18 17:58:00 UTC
[jira] [Commented] (FLINK-10566) Flink Planning is exponential in
the number of stages
[ https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655681#comment-16655681 ]
Maximilian Michels commented on FLINK-10566:
--------------------------------------------
Here's a thread dump:
{noformat}
....
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:220)
at org.apache.flink.api.common.Plan.accept(Plan.java:333)
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:964)
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:85)
at SlowSchedulingTest.runPipeline(SlowSchedulingTest.java:48)
at SlowSchedulingTest.main(SlowSchedulingTest.java:34)
{noformat}
> Flink Planning is exponential in the number of stages
> -----------------------------------------------------
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 1.5.4
> Reporter: Robert Bradshaw
> Priority: Major
> Attachments: chart.png
>
>
> This makes it nearly impossible to run graphs with 100 or more stages. (The execution itself is still sub-second, but the job submission takes increasingly long.)
> I can reproduce this with the following pipeline, which resembles my real-world workloads (with depth up to 10 and width up, and past, 50). On Flink it seems getting width beyond width 10 is problematic (times out after hours). Note the log scale on the chart for time.
>
> {code:java}
> public static void runPipeline(int depth, int width) throws Exception {
> final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> DataSet<String> input = env.fromElements("a", "b", "c");
> DataSet<String> stats = null;
> for (int i = 0; i < depth; i++) {
> stats = analyze(input, stats, width / (i + 1) + 1);
> }
> stats.writeAsText("out.txt");
> env.execute("depth " + depth + " width " + width);
> }
> public static DataSet<String> analyze(DataSet<String> input, DataSet<String> stats, int branches) {
> System.out.println("analyze " + branches);
> for (int i = 0; i < branches; i++) {
> final int ii = i;
> if (stats != null) {
> input = input.map(new RichMapFunction<String, String>() {
> @Override
> public void open(Configuration parameters) throws Exception {
> Collection<String> broadcastSet = getRuntimeContext().getBroadcastVariable("stats");
> }
> @Override
> public String map(String value) throws Exception {
> return value;
> }
> }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
> }
> DataSet<String> branch = input
> .map(s -> new Tuple2<Integer, String>(0, s + ii))
> .groupBy(0)
> .minBy(1)
> .map(kv -> kv.f1);
> if (stats == null) {
> stats = branch;
> } else {
> stats = stats.union(branch);
> }
> }
> return stats.map(s -> "(" + s + ").stats");
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)