You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Niels Basjes (JIRA)" <ji...@apache.org> on 2016/11/07 10:19:59 UTC

[jira] [Created] (FLINK-5025) Job fails because of Optimizer bug

Niels Basjes created FLINK-5025:
-----------------------------------

             Summary: Job fails because of Optimizer bug
                 Key: FLINK-5025
                 URL: https://issues.apache.org/jira/browse/FLINK-5025
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.1.3
            Reporter: Niels Basjes


I have a batch job that when I run it I get the error message:

{code}
org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.
	at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
	at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
	at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
	at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
	at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
	at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
	at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
	at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185)
	at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
	at com.bol.reproduce.flink.Main.run(Main.java:42)
	at com.bol.reproduce.flink.Main.main(Main.java:21)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
{code}

The smallest code snippet I have been able to create that reproduces this problem is below here.
Note that when using a single union this error does not happen.

{code}
public class Main implements Serializable {
  public static void main(String[] args) throws Exception {
    System.exit(new Main().run());
  }

  private int run() throws IOException {
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    final DataSet<String> lines =
               env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist")))
        .union(env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist"))))
        .union(env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist"))));

    List<String> allLines = new ArrayList<>();
    lines
      .rebalance()
      .output(new LocalCollectionOutputFormat<>(allLines));

    // execute program
    try {
      env.execute("Running");
    } catch (Exception e) {
      e.printStackTrace();
    }
    return 0;
  }
}
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)