You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2014/11/19 14:03:34 UTC
[jira] [Commented] (FLINK-1254) Optimizer bug during pipeline
breaker placement
[ https://issues.apache.org/jira/browse/FLINK-1254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14217864#comment-14217864 ]
ASF GitHub Bot commented on FLINK-1254:
---------------------------------------
GitHub user StephanEwen opened a pull request:
https://github.com/apache/incubator-flink/pull/216
[FLINK-1254] Fix compiler bug for pipeline breaker placement
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StephanEwen/incubator-flink compiler_fix
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-flink/pull/216.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #216
----
commit 8ac89d741a949933dc3c93090c642e3d5cd4f21c
Author: Stephan Ewen <se...@apache.org>
Date: 2014-11-19T11:28:06Z
[FLINK-1254] Fix compiler bug for pipeline breaker placement
----
> Optimizer bug during pipeline breaker placement
> -----------------------------------------------
>
> Key: FLINK-1254
> URL: https://issues.apache.org/jira/browse/FLINK-1254
> Project: Flink
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 0.8-incubating
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 0.8-incubating
>
>
> The compiler fails on certain programs when trying to place pipeline breakers.
> This code reproduces the error:
> {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.setDegreeOfParallelism(8);
> // the workset (input two of the delta iteration) is the same as what is consumed be the successive join
> DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
> DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
> // trivial iteration, since we are interested in the inputs to the iteration
> DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
> DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
> DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
> initialWorkset
> .join(result, JoinHint.REPARTITION_HASH_FIRST)
> .where(0).equalTo(0)
> .print();
> Plan p = env.createProgramPlan();
> compileNoStats(p);
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)