You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (JIRA)" <ji...@apache.org> on 2015/11/20 15:01:10 UTC

[jira] [Closed] (FLINK-3052) Optimizer does not push properties out of bulk iterations

     [ https://issues.apache.org/jira/browse/FLINK-3052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Till Rohrmann closed FLINK-3052.
--------------------------------
       Resolution: Fixed
    Fix Version/s: 1.0.0

Fixed via 8dc70f2e770ac1c355869f5e0b5e23379b1ed76f

> Optimizer does not push properties out of bulk iterations
> ---------------------------------------------------------
>
>                 Key: FLINK-3052
>                 URL: https://issues.apache.org/jira/browse/FLINK-3052
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.10.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.0.0, 0.10.1
>
>
> Flink's optimizer should be able to reuse interesting properties from outside the loop. In order to do that it is sometimes necessary to append a NoOp node to the step function which recomputes the required properties.
> This is currently not working for {{BulkIterations}}, because the plans with the appended NoOp nodes are not added to the overall list of candidates.
> This not only leads to sub-optimal plan selection but sometimes to the rejection of valid jobs. The following job, for example, will be falsely rejected by flink.
> {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 		DataSet<Tuple1<Long>> input1 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
> 			@Override
> 			public Tuple1<Long> map(Long value) throws Exception {
> 				return new Tuple1<>(value);
> 			}
> 		});
> 		DataSet<Tuple1<Long>> input2 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
> 			@Override
> 			public Tuple1<Long> map(Long value) throws Exception {
> 				return new Tuple1<>(value);
> 			}
> 		});
> 		DataSet<Tuple1<Long>> distinctInput = input1.distinct();
> 		IterativeDataSet<Tuple1<Long>> iteration = distinctInput.iterate(10);
> 		DataSet<Tuple1<Long>> iterationStep = iteration
> 				.coGroup(input2)
> 				.where(0)
> 				.equalTo(0)
> 				.with(new CoGroupFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>>() {
> 					@Override
> 					public void coGroup(
> 							Iterable<Tuple1<Long>> first,
> 							Iterable<Tuple1<Long>> second,
> 							Collector<Tuple1<Long>> out) throws Exception {
> 						Iterator<Tuple1<Long>> it = first.iterator();
> 						if (it.hasNext()) {
> 							out.collect(it.next());
> 						}
> 					}
> 				});
> 		DataSet<Tuple1<Long>> iterationResult = iteration.closeWith(iterationStep);
> 		iterationResult.output(new DiscardingOutputFormat<Tuple1<Long>>());
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)