You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2014/11/19 12:27:33 UTC
[jira] [Created] (FLINK-1254) Optimizer bug during pipeline breaker
placement
Stephan Ewen created FLINK-1254:
-----------------------------------
Summary: Optimizer bug during pipeline breaker placement
Key: FLINK-1254
URL: https://issues.apache.org/jira/browse/FLINK-1254
Project: Flink
Issue Type: Bug
Components: Optimizer
Affects Versions: 0.8-incubating
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Fix For: 0.8-incubating
The compiler fails on certain programs when trying to place pipeline breakers.
This code reproduces the error:
{code}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(8);
// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
// trivial iteration, since we are interested in the inputs to the iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
initialWorkset
.join(result, JoinHint.REPARTITION_HASH_FIRST)
.where(0).equalTo(0)
.print();
Plan p = env.createProgramPlan();
compileNoStats(p);
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)