You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/11/20 12:33:11 UTC

[jira] [Commented] (FLINK-3052) Optimizer does not push properties out of bulk iterations

    [ https://issues.apache.org/jira/browse/FLINK-3052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015638#comment-15015638 ] 

ASF GitHub Bot commented on FLINK-3052:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/1388

    [FLINK-3052] [optimizer] Fix instantiation of bulk iteration candidates

    When a candidate for a bulk iteration is instantiated, then the optimizer creates candidates
    for the step function. It is then checked that there exists a candidate solution for the step
    function whose properties met the properties of the input to the bulk iteration. Sometimes
    it is necessary to add a no-op plan node to the end of the step function to generate the
    correct properties. These new candidates have to be added to the final set of the accepted
    candidates.
    
    This commit adds that these new candidates are properly added to the set of accepted candidates.
    
    Fix test and add new iteration tests
    
    Add predecessor operator and dynamic path information to no op operator in bulk iterations

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixBulkIteration

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1388.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1388
    
----
commit 27e2085da7423bbb114ece8b9a0f19339c41a696
Author: Till Rohrmann <tr...@apache.org>
Date:   2015-11-19T13:24:15Z

    [FLINK-3052] [optimizer] Fix instantiation of bulk iteration candidates
    
    When a candidate for a bulk iteration is instantiated, then the optimizer creates candidates
    for the step function. It is then checked that there exists a candidate solution for the step
    function whose properties met the properties of the input to the bulk iteration. Sometimes
    it is necessary to add a no-op plan node to the end of the step function to generate the
    correct properties. These new candidates have to be added to the final set of the accepted
    candidates.
    
    This commit adds that these new candidates are properly added to the set of accepted candidates.
    
    Fix test and add new iteration tests
    
    Add predecessor operator and dynamic path information to no op operator in bulk iterations

----


> Optimizer does not push properties out of bulk iterations
> ---------------------------------------------------------
>
>                 Key: FLINK-3052
>                 URL: https://issues.apache.org/jira/browse/FLINK-3052
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.10.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 0.10.1
>
>
> Flink's optimizer should be able to reuse interesting properties from outside the loop. In order to do that it is sometimes necessary to append a NoOp node to the step function which recomputes the required properties.
> This is currently not working for {{BulkIterations}}, because the plans with the appended NoOp nodes are not added to the overall list of candidates.
> This not only leads to sub-optimal plan selection but sometimes to the rejection of valid jobs. The following job, for example, will be falsely rejected by flink.
> {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 		DataSet<Tuple1<Long>> input1 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
> 			@Override
> 			public Tuple1<Long> map(Long value) throws Exception {
> 				return new Tuple1<>(value);
> 			}
> 		});
> 		DataSet<Tuple1<Long>> input2 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
> 			@Override
> 			public Tuple1<Long> map(Long value) throws Exception {
> 				return new Tuple1<>(value);
> 			}
> 		});
> 		DataSet<Tuple1<Long>> distinctInput = input1.distinct();
> 		IterativeDataSet<Tuple1<Long>> iteration = distinctInput.iterate(10);
> 		DataSet<Tuple1<Long>> iterationStep = iteration
> 				.coGroup(input2)
> 				.where(0)
> 				.equalTo(0)
> 				.with(new CoGroupFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>>() {
> 					@Override
> 					public void coGroup(
> 							Iterable<Tuple1<Long>> first,
> 							Iterable<Tuple1<Long>> second,
> 							Collector<Tuple1<Long>> out) throws Exception {
> 						Iterator<Tuple1<Long>> it = first.iterator();
> 						if (it.hasNext()) {
> 							out.collect(it.next());
> 						}
> 					}
> 				});
> 		DataSet<Tuple1<Long>> iterationResult = iteration.closeWith(iterationStep);
> 		iterationResult.output(new DiscardingOutputFormat<Tuple1<Long>>());
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)