You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2014/11/19 14:03:34 UTC

[jira] [Commented] (FLINK-1254) Optimizer bug during pipeline breaker placement

    [ https://issues.apache.org/jira/browse/FLINK-1254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14217864#comment-14217864 ] 

ASF GitHub Bot commented on FLINK-1254:
---------------------------------------

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/incubator-flink/pull/216

    [FLINK-1254] Fix compiler bug for pipeline breaker placement

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink compiler_fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/216.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #216
    
----
commit 8ac89d741a949933dc3c93090c642e3d5cd4f21c
Author: Stephan Ewen <se...@apache.org>
Date:   2014-11-19T11:28:06Z

    [FLINK-1254] Fix compiler bug for pipeline breaker placement

----


> Optimizer bug during pipeline breaker placement
> -----------------------------------------------
>
>                 Key: FLINK-1254
>                 URL: https://issues.apache.org/jira/browse/FLINK-1254
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.8-incubating
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 0.8-incubating
>
>
> The compiler fails on certain programs when trying to place pipeline breakers.
> This code reproduces the error:
> {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.setDegreeOfParallelism(8);
> // the workset (input two of the delta iteration) is the same as what is consumed be the successive join
> DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
> DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
> // trivial iteration, since we are interested in the inputs to the iteration
> DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
> DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
> DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
> initialWorkset
> 	.join(result, JoinHint.REPARTITION_HASH_FIRST)
> 	.where(0).equalTo(0)
> 	.print();
> Plan p = env.createProgramPlan();
> compileNoStats(p);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)