You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2014/11/19 12:27:33 UTC

[jira] [Created] (FLINK-1254) Optimizer bug during pipeline breaker placement

Stephan Ewen created FLINK-1254:
-----------------------------------

             Summary: Optimizer bug during pipeline breaker placement
                 Key: FLINK-1254
                 URL: https://issues.apache.org/jira/browse/FLINK-1254
             Project: Flink
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 0.8-incubating
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen
             Fix For: 0.8-incubating


The compiler fails on certain programs when trying to place pipeline breakers.

This code reproduces the error:

{code}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(8);

// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());

DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());

// trivial iteration, since we are interested in the inputs to the iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);

DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());

DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);

initialWorkset
	.join(result, JoinHint.REPARTITION_HASH_FIRST)
	.where(0).equalTo(0)
	.print();

Plan p = env.createProgramPlan();
compileNoStats(p);
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)