You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/12 23:37:11 UTC
incubator-flink git commit: [FLINK-1235] Compiler accepts iterations
referenced from the static path of other iterations - Fix
NepheleJobGraphGenerator to support iterations referenced on the static path
of another iteration - Catch nested iterations o
Repository: incubator-flink
Updated Branches:
refs/heads/master 3772d3041 -> 21b1b975c
[FLINK-1235] Compiler accepts iterations referenced from the static path of other iterations
- Fix NepheleJobGraphGenerator to support iterations referenced on the static path of another iteration
- Catch nested iterations on dynamic path properly in optimizer (and give a good error message)
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/21b1b975
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/21b1b975
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/21b1b975
Branch: refs/heads/master
Commit: 21b1b975ccb50e1831172894bde96c6d3269dc57
Parents: 3772d30
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 12 16:21:59 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 12 21:48:07 2014 +0100
----------------------------------------------------------------------
.../org/apache/flink/compiler/PactCompiler.java | 9 +-
.../flink/compiler/dag/BulkIterationNode.java | 15 +-
.../compiler/dag/WorksetIterationNode.java | 10 +
.../plantranslate/NepheleJobGraphGenerator.java | 32 +++-
.../flink/compiler/IterationsCompilerTest.java | 14 +-
.../flink/compiler/NestedIterationsTest.java | 181 +++++++++++++++++++
.../StaticlyNestedIterationsITCase.java | 86 +++++++++
7 files changed, 335 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 15aac32..2ce2495 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -849,7 +849,7 @@ public class PactCompiler {
// we need to ensure that both the next-workset and the solution-set-delta depend on the workset. One check is for free
// during the translation, we do the other check here as a pre-condition
{
- WorksetFinder wsf = new WorksetFinder();
+ StepFunctionValidator wsf = new StepFunctionValidator();
iter.getNextWorkset().accept(wsf);
if (!wsf.foundWorkset) {
throw new CompilerException("In the given program, the next workset does not depend on the workset. This is a prerequisite in delta iterations.");
@@ -943,6 +943,11 @@ public class PactCompiler {
@Override
public void postVisit(OptimizerNode visitable) {
visitable.identifyDynamicPath(this.costWeight);
+
+ // check that there is no nested iteration on the dynamic path
+ if (visitable.isOnDynamicPath() && visitable instanceof IterationNode) {
+ throw new CompilerException("Nested iterations are currently not supported.");
+ }
}
}
@@ -1295,7 +1300,7 @@ public class PactCompiler {
}
}
- private static final class WorksetFinder implements Visitor<Operator<?>> {
+ private static final class StepFunctionValidator implements Visitor<Operator<?>> {
private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
index d3f0fbb..d500925 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.dag;
import java.util.ArrayList;
@@ -60,9 +59,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
private OptimizerNode nextPartialSolution;
- private PactConnection rootConnection;
+ private PactConnection rootConnection; // connection out of the next partial solution
- private PactConnection terminationCriterionRootConnection;
+ private PactConnection terminationCriterionRootConnection; // connection out of the term. criterion
private OptimizerNode singleRoot;
@@ -130,7 +129,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) {
// check if the root of the step function has the same DOP as the iteration
- // or if the steo function has any operator at all
+ // or if the step function has any operator at all
if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() ||
nextPartialSolution == partialSolution || nextPartialSolution instanceof BinaryUnionNode)
{
@@ -246,6 +245,14 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
inProps.addLocalProperties(new RequestedLocalProperties());
this.inConn.setInterestingProperties(inProps);
}
+
+ @Override
+ public void clearInterestingProperties() {
+ super.clearInterestingProperties();
+
+ this.singleRoot.accept(InterestingPropertiesClearer.INSTANCE);
+ this.rootConnection.clearInterestingProperties();
+ }
@Override
public void computeUnclosedBranchStack() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
index b6ae34e..0dd23bf 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
@@ -290,6 +290,16 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
this.input1.setInterestingProperties(partitionedIP);
}
+ @Override
+ public void clearInterestingProperties() {
+ super.clearInterestingProperties();
+
+ this.nextWorksetRootConnection.clearInterestingProperties();
+ this.solutionSetDeltaRootConnection.clearInterestingProperties();
+
+ this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
+ this.solutionSetDelta.accept(InterestingPropertiesClearer.INSTANCE);
+ }
@Override
protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index d5f9b94..b717924 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -120,7 +120,9 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
private int iterationIdEnumerator = 1;
- private IterationPlanNode currentIteration; // hack: as long as no nesting is possible, remember the enclosing iteration
+ private IterationPlanNode currentIteration; // the current the enclosing iteration
+
+ private List<IterationPlanNode> iterationStack; // stack of enclosing iterations
private SlotSharingGroup sharingGroup;
@@ -156,12 +158,18 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
this.chainedTasksInSequence = new ArrayList<TaskInChain>();
this.auxVertices = new ArrayList<AbstractJobVertex>();
this.iterations = new HashMap<IterationPlanNode, IterationDescriptor>();
+ this.iterationStack = new ArrayList<IterationPlanNode>();
this.sharingGroup = new SlotSharingGroup();
// generate Nephele job graph
program.accept(this);
+ // sanity check that we are not somehow in an iteration at the end
+ if (this.currentIteration != null) {
+ throw new CompilerException("The graph translation ended prematurely, leaving an unclosed iteration.");
+ }
+
// finalize the iterations
for (IterationDescriptor iteration : this.iterations.values()) {
if (iteration.getIterationNode() instanceof BulkIterationPlanNode) {
@@ -207,7 +215,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
this.chainedTasksInSequence = null;
this.auxVertices = null;
this.iterations = null;
-
+ this.iterationStack = null;
+
// return job graph
return graph;
}
@@ -391,13 +400,26 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// check if we have an iteration. in that case, translate the step function now
if (node instanceof IterationPlanNode) {
- // for now, prevent nested iterations
- if (this.currentIteration != null) {
+ // prevent nested iterations
+ if (node.isOnDynamicPath()) {
throw new CompilerException("Nested Iterations are not possible at the moment!");
}
+
+ // if we recursively go into an iteration (because the constant path of one iteration contains
+ // another one), we push the current one onto the stack
+ if (this.currentIteration != null) {
+ this.iterationStack.add(this.currentIteration);
+ }
+
this.currentIteration = (IterationPlanNode) node;
this.currentIteration.acceptForStepFunction(this);
- this.currentIteration = null;
+
+ // pop the current iteration from the stack
+ if (this.iterationStack.isEmpty()) {
+ this.currentIteration = null;
+ } else {
+ this.currentIteration = this.iterationStack.remove(this.iterationStack.size() - 1);
+ }
// inputs for initial bulk partial solution or initial workset are already connected to the iteration head in the head's post visit.
// connect the initial solution set now.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 0657172..2be28e4 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.*;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.junit.Test;
-
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DeltaIteration;
@@ -40,6 +39,7 @@ import org.apache.flink.compiler.plan.Channel;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
import org.apache.flink.compiler.testfunctions.IdentityMapper;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
@@ -71,7 +71,11 @@ public class IterationsCompilerTest extends CompilerTestBase {
OptimizedPlan p = compileNoStats(env.createProgramPlan());
+ // check that the JSON generator accepts this plan
new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
+
+ // check that the JobGraphGenerator accepts the plan
+ new NepheleJobGraphGenerator().compileJobGraph(p);
}
catch (Exception e) {
e.printStackTrace();
@@ -107,6 +111,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+
+ new NepheleJobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -141,6 +147,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+
+ new NepheleJobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -175,6 +183,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+
+ new NepheleJobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -207,6 +217,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
}
+
+ new NepheleJobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
System.err.println(e.getMessage());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java
new file mode 100644
index 0000000..88069fd
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class NestedIterationsTest extends CompilerTestBase {
+
+ @Test
+ public void testRejectNestedBulkIterations() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> data = env.generateSequence(1, 100);
+
+ IterativeDataSet<Long> outerIteration = data.iterate(100);
+
+ IterativeDataSet<Long> innerIteration = outerIteration.map(new IdentityMapper<Long>()).iterate(100);
+
+ DataSet<Long> innerResult = innerIteration.closeWith(innerIteration.map(new IdentityMapper<Long>()));
+
+ DataSet<Long> outerResult = outerIteration.closeWith(innerResult.map(new IdentityMapper<Long>()));
+
+ outerResult.print();
+
+ Plan p = env.createProgramPlan();
+
+ try {
+ compileNoStats(p);
+ }
+ catch (CompilerException e) {
+ assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRejectNestedWorksetIterations() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> outerIteration = data.iterateDelta(data, 100, 0);
+
+ DataSet<Tuple2<Long, Long>> inOuter = outerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> innerIteration = inOuter.iterateDelta(inOuter, 100, 0);
+
+ DataSet<Tuple2<Long, Long>> inInner = innerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> innerResult = innerIteration.closeWith(inInner, inInner).map(new IdentityMapper<Tuple2<Long,Long>>());
+
+ DataSet<Tuple2<Long, Long>> outerResult = outerIteration.closeWith(innerResult, innerResult);
+
+ outerResult.print();
+
+ Plan p = env.createProgramPlan();
+
+ try {
+ compileNoStats(p);
+ }
+ catch (CompilerException e) {
+ assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBulkIterationInClosure() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> data1 = env.generateSequence(1, 100);
+ DataSet<Long> data2 = env.generateSequence(1, 100);
+
+ IterativeDataSet<Long> firstIteration = data1.iterate(100);
+
+ DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
+
+
+ IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
+
+ DataSet<Long> joined = mainIteration.join(firstResult)
+ .where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
+ .with(new DummyFlatJoinFunction<Long>());
+
+ DataSet<Long> mainResult = mainIteration.closeWith(joined);
+
+ mainResult.print();
+
+ Plan p = env.createProgramPlan();
+
+ // optimizer should be able to translate this
+ OptimizedPlan op = compileNoStats(p);
+
+ // job graph generator should be able to translate this
+ new NepheleJobGraphGenerator().compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDeltaIterationInClosure() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> data1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+ DataSet<Tuple2<Long, Long>> data2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> firstIteration = data1.iterateDelta(data1, 100, 0);
+
+ DataSet<Tuple2<Long, Long>> inFirst = firstIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> firstResult = firstIteration.closeWith(inFirst, inFirst).map(new IdentityMapper<Tuple2<Long,Long>>());
+
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> mainIteration = data2.iterateDelta(data2, 100, 0);
+
+ DataSet<Tuple2<Long, Long>> joined = mainIteration.getWorkset().join(firstResult).where(0).equalTo(0)
+ .projectFirst(0).projectSecond(0).types(Long.class, Long.class);
+
+ DataSet<Tuple2<Long, Long>> mainResult = mainIteration.closeWith(joined, joined);
+
+ mainResult.print();
+
+ Plan p = env.createProgramPlan();
+
+ // optimizer should be able to translate this
+ OptimizedPlan op = compileNoStats(p);
+
+ // job graph generator should be able to translate this
+ new NepheleJobGraphGenerator().compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
new file mode 100644
index 0000000..19fc936
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class StaticlyNestedIterationsITCase extends JavaProgramTestBase {
+
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> data1 = env.generateSequence(1, 100);
+ DataSet<Long> data2 = env.generateSequence(1, 100);
+
+ IterativeDataSet<Long> firstIteration = data1.iterate(100);
+
+ DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdMapper()));
+
+
+ IterativeDataSet<Long> mainIteration = data2.map(new IdMapper()).iterate(100);
+
+ DataSet<Long> joined = mainIteration.join(firstResult)
+ .where(new IdKeyExtractor()).equalTo(new IdKeyExtractor())
+ .with(new Joiner());
+
+ DataSet<Long> mainResult = mainIteration.closeWith(joined);
+
+ mainResult.print();
+
+ env.execute();
+ }
+
+ private static class IdKeyExtractor implements KeySelector<Long, Long> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long getKey(Long value) {
+ return value;
+ }
+ }
+
+ private static class IdMapper implements MapFunction<Long, Long> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long map(Long value) {
+ return value;
+ }
+ }
+
+ private static class Joiner implements JoinFunction<Long, Long, Long> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long join(Long first, Long second) {
+ return first;
+ }
+ }
+}