You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yassine MARZOUGUI <y....@mindlytix.com> on 2016/10/25 10:33:40 UTC
Bug: Plan generation for Unions picked a ship strategy between binary
plan operators.
Hi all,
My job fails with the folowing exception : CompilerException: Bug: Plan
generation for Unions picked a ship strategy between binary plan operators.
The exception happens when adding partitionByRange(1).sortPartition(1,
Order.DESCENDING) to the union of datasets.
I made a smaller version that reproduces the bug :
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.Iterator;
public class BugReproduce {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSet<WC> wc1 = env.fromElements(new WC("first",1), new
WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2));
DataSet<WC> wc2 = env.fromElements(new WC("third",1), new
WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2));
DataSet<WC> wc3 = env.fromElements(new WC("fifth",1), new
WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2));
DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1);
DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2);
DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3);
DataSet<Tuple2<String,Integer>> all =
aggregatedwc1.union(aggregatedwc2).union(aggregatedwc3);
all.partitionByRange(1).sortPartition(1, Order.DESCENDING).print();
}
public static DataSet<Tuple2<String,Integer>> aggregateWC(DataSet<WC>
input){
return input.groupBy("word").reduceGroup(new
GroupReduceFunction<WC, Tuple2<String, Integer>>() {
@Override
public void reduce(Iterable<WC> iterable,
Collector<Tuple2<String, Integer>> collector) throws Exception {
Integer count = 0;
Iterator<WC> iterator = iterable.iterator();
if (iterator.hasNext()) {
String word= iterator.next().word;
while (iterator.hasNext()) {
iterator.next();
count += 1;
}
collector.collect(Tuple2.of(word,count));
}
}
});
}
public static class WC {
public String word;
public int count;
public WC() {
}
public WC(String word, int count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
}
Here is the exception stacktrace:
Exception in thread "main" org.apache.flink.optimizer.CompilerException:
Bug: Plan generation for Unions picked a ship strategy between binary plan
operators.
at
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
at
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
at
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
at
org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at
org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at org.myorg.prod.BugReproduce.main(BugReproduce.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
I'm using Flink v1.1.3. Any help is appreciated. Thank you.
Best,
Yassine
Re: Bug: Plan generation for Unions picked a ship strategy between
binary plan operators.
Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
Hi Fabian,
I commented on the issue and attached the program reproducing the bug, But
I couldn't find how to re-open it (I think maybe I don't have enough
permissions?).
Best,
Yassine
2016-10-25 12:49 GMT+02:00 Fabian Hueske <fh...@gmail.com>:
> Hi Yassine,
>
> I thought I had fixed that bug a few weeks a ago, but apparently the fix
> did not catch all cases.
> Can you please reopen FLINK-2662 and post the program to reproduce the bug
> there?
>
> Thanks,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-2662
>
> 2016-10-25 12:33 GMT+02:00 Yassine MARZOUGUI <y....@mindlytix.com>:
>
>> Hi all,
>>
>> My job fails with the folowing exception : CompilerException: Bug: Plan
>> generation for Unions picked a ship strategy between binary plan operators.
>> The exception happens when adding partitionByRange(1).sortPartition(1,
>> Order.DESCENDING) to the union of datasets.
>>
>> I made a smaller version that reproduces the bug :
>>
>> import org.apache.flink.api.common.functions.GroupReduceFunction;
>> import org.apache.flink.api.common.operators.Order;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.util.Collector;
>> import java.util.Iterator;
>>
>> public class BugReproduce {
>> public static void main(String[] args) throws Exception {
>> final ExecutionEnvironment env = ExecutionEnvironment.getExecut
>> ionEnvironment();
>> DataSet<WC> wc1 = env.fromElements(new WC("first",1), new
>> WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2));
>> DataSet<WC> wc2 = env.fromElements(new WC("third",1), new
>> WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2));
>> DataSet<WC> wc3 = env.fromElements(new WC("fifth",1), new
>> WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2));
>>
>> DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1);
>> DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2);
>> DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3);
>> DataSet<Tuple2<String,Integer>> all =
>> aggregatedwc1.union(aggregatedwc2).union(aggregatedwc3);
>> all.partitionByRange(1).sortPartition(1,
>> Order.DESCENDING).print();
>>
>> }
>>
>> public static DataSet<Tuple2<String,Integer>>
>> aggregateWC(DataSet<WC> input){
>> return input.groupBy("word").reduceGroup(new
>> GroupReduceFunction<WC, Tuple2<String, Integer>>() {
>> @Override
>> public void reduce(Iterable<WC> iterable,
>> Collector<Tuple2<String, Integer>> collector) throws Exception {
>> Integer count = 0;
>> Iterator<WC> iterator = iterable.iterator();
>> if (iterator.hasNext()) {
>> String word= iterator.next().word;
>> while (iterator.hasNext()) {
>> iterator.next();
>> count += 1;
>> }
>> collector.collect(Tuple2.of(word,count));
>> }
>> }
>> });
>> }
>>
>> public static class WC {
>> public String word;
>> public int count;
>>
>> public WC() {
>> }
>>
>> public WC(String word, int count) {
>> this.word = word;
>> this.count = count;
>> }
>>
>> public String getWord() {
>> return word;
>> }
>>
>> public void setWord(String word) {
>> this.word = word;
>> }
>>
>> public int getCount() {
>> return count;
>> }
>>
>> public void setCount(int count) {
>> this.count = count;
>> }
>> }
>> }
>>
>> Here is the exception stacktrace:
>>
>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>> Bug: Plan generation for Unions picked a ship strategy between binary plan
>> operators.
>> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.
>> collect(BinaryUnionReplacer.java:113)
>> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.
>> postVisit(BinaryUnionReplacer.java:72)
>> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.
>> postVisit(BinaryUnionReplacer.java:41)
>> at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(Dua
>> lInputPlanNode.java:170)
>> at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(
>> SingleInputPlanNode.java:199)
>> at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(
>> SingleInputPlanNode.java:199)
>> at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(
>> SingleInputPlanNode.java:199)
>> at org.apache.flink.optimizer.plan.OptimizedPlan.accept(Optimiz
>> edPlan.java:128)
>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecu
>> tor.java:185)
>> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvi
>> ronment.java:91)
>> at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu
>> tionEnvironment.java:896)
>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>> at org.myorg.prod.BugReproduce.main(BugReproduce.java:28)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>
>> I'm using Flink v1.1.3. Any help is appreciated. Thank you.
>>
>> Best,
>> Yassine
>>
>
>
Re: Bug: Plan generation for Unions picked a ship strategy between
binary plan operators.
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Yassine,
I thought I had fixed that bug a few weeks a ago, but apparently the fix
did not catch all cases.
Can you please reopen FLINK-2662 and post the program to reproduce the bug
there?
Thanks,
Fabian
[1] https://issues.apache.org/jira/browse/FLINK-2662
2016-10-25 12:33 GMT+02:00 Yassine MARZOUGUI <y....@mindlytix.com>:
> Hi all,
>
> My job fails with the folowing exception : CompilerException: Bug: Plan
> generation for Unions picked a ship strategy between binary plan operators.
> The exception happens when adding partitionByRange(1).sortPartition(1,
> Order.DESCENDING) to the union of datasets.
>
> I made a smaller version that reproduces the bug :
>
> import org.apache.flink.api.common.functions.GroupReduceFunction;
> import org.apache.flink.api.common.operators.Order;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.util.Collector;
> import java.util.Iterator;
>
> public class BugReproduce {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env = ExecutionEnvironment.
> getExecutionEnvironment();
> DataSet<WC> wc1 = env.fromElements(new WC("first",1), new
> WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2));
> DataSet<WC> wc2 = env.fromElements(new WC("third",1), new
> WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2));
> DataSet<WC> wc3 = env.fromElements(new WC("fifth",1), new
> WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2));
>
> DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1);
> DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2);
> DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3);
> DataSet<Tuple2<String,Integer>> all = aggregatedwc1.union(
> aggregatedwc2).union(aggregatedwc3);
> all.partitionByRange(1).sortPartition(1,
> Order.DESCENDING).print();
>
> }
>
> public static DataSet<Tuple2<String,Integer>> aggregateWC(DataSet<WC>
> input){
> return input.groupBy("word").reduceGroup(new
> GroupReduceFunction<WC, Tuple2<String, Integer>>() {
> @Override
> public void reduce(Iterable<WC> iterable,
> Collector<Tuple2<String, Integer>> collector) throws Exception {
> Integer count = 0;
> Iterator<WC> iterator = iterable.iterator();
> if (iterator.hasNext()) {
> String word= iterator.next().word;
> while (iterator.hasNext()) {
> iterator.next();
> count += 1;
> }
> collector.collect(Tuple2.of(word,count));
> }
> }
> });
> }
>
> public static class WC {
> public String word;
> public int count;
>
> public WC() {
> }
>
> public WC(String word, int count) {
> this.word = word;
> this.count = count;
> }
>
> public String getWord() {
> return word;
> }
>
> public void setWord(String word) {
> this.word = word;
> }
>
> public int getCount() {
> return count;
> }
>
> public void setCount(int count) {
> this.count = count;
> }
> }
> }
>
> Here is the exception stacktrace:
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> Bug: Plan generation for Unions picked a ship strategy between binary plan
> operators.
> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(
> BinaryUnionReplacer.java:113)
> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(
> BinaryUnionReplacer.java:72)
> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(
> BinaryUnionReplacer.java:41)
> at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(
> DualInputPlanNode.java:170)
> at org.apache.flink.optimizer.plan.SingleInputPlanNode.
> accept(SingleInputPlanNode.java:199)
> at org.apache.flink.optimizer.plan.SingleInputPlanNode.
> accept(SingleInputPlanNode.java:199)
> at org.apache.flink.optimizer.plan.SingleInputPlanNode.
> accept(SingleInputPlanNode.java:199)
> at org.apache.flink.optimizer.plan.OptimizedPlan.accept(
> OptimizedPlan.java:128)
> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
> at org.apache.flink.client.LocalExecutor.executePlan(
> LocalExecutor.java:185)
> at org.apache.flink.api.java.LocalEnvironment.execute(
> LocalEnvironment.java:91)
> at org.apache.flink.api.java.ExecutionEnvironment.execute(
> ExecutionEnvironment.java:896)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> at org.myorg.prod.BugReproduce.main(BugReproduce.java:28)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>
> I'm using Flink v1.1.3. Any help is appreciated. Thank you.
>
> Best,
> Yassine
>