You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (JIRA)" <ji...@apache.org> on 2015/11/20 12:21:11 UTC
[jira] [Created] (FLINK-3052) Optimizer does not push properties
out of bulk iterations
Till Rohrmann created FLINK-3052:
------------------------------------
Summary: Optimizer does not push properties out of bulk iterations
Key: FLINK-3052
URL: https://issues.apache.org/jira/browse/FLINK-3052
Project: Flink
Issue Type: Bug
Components: Optimizer
Affects Versions: 0.10.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Fix For: 0.10.1
Flink's optimizer should be able to reuse interesting properties from outside the loop. In order to do that it is sometimes necessary to append a NoOp node to the step function which recomputes the required properties.
This is currently not working for {{BulkIterations}}, because the plans with the appended NoOp nodes are not added to the overall list of candidates.
This not only leads to sub-optimal plan selection but sometimes to the rejection of valid jobs. The following job, for example, will be falsely rejected by flink.
{code}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple1<Long>> input1 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
@Override
public Tuple1<Long> map(Long value) throws Exception {
return new Tuple1<>(value);
}
});
DataSet<Tuple1<Long>> input2 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
@Override
public Tuple1<Long> map(Long value) throws Exception {
return new Tuple1<>(value);
}
});
DataSet<Tuple1<Long>> distinctInput = input1.distinct();
IterativeDataSet<Tuple1<Long>> iteration = distinctInput.iterate(10);
DataSet<Tuple1<Long>> iterationStep = iteration
.coGroup(input2)
.where(0)
.equalTo(0)
.with(new CoGroupFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>>() {
@Override
public void coGroup(
Iterable<Tuple1<Long>> first,
Iterable<Tuple1<Long>> second,
Collector<Tuple1<Long>> out) throws Exception {
Iterator<Tuple1<Long>> it = first.iterator();
if (it.hasNext()) {
out.collect(it.next());
}
}
});
DataSet<Tuple1<Long>> iterationResult = iteration.closeWith(iterationStep);
iterationResult.output(new DiscardingOutputFormat<Tuple1<Long>>());
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)