You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/12 23:37:11 UTC

incubator-flink git commit: [FLINK-1235] Compiler accepts iterations referenced from the static path of other iterations - Fix NepheleJobGraphGenerator to support iterations referenced on the static path of another iteration - Catch nested iterations o

Repository: incubator-flink
Updated Branches:
  refs/heads/master 3772d3041 -> 21b1b975c


[FLINK-1235] Compiler accepts iterations referenced from the static path of other iterations
 - Fix NepheleJobGraphGenerator to support iterations referenced on the static path of another iteration
 - Catch nested iterations on dynamic path properly in optimizer (and give a good error message)


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/21b1b975
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/21b1b975
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/21b1b975

Branch: refs/heads/master
Commit: 21b1b975ccb50e1831172894bde96c6d3269dc57
Parents: 3772d30
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 12 16:21:59 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 12 21:48:07 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java |   9 +-
 .../flink/compiler/dag/BulkIterationNode.java   |  15 +-
 .../compiler/dag/WorksetIterationNode.java      |  10 +
 .../plantranslate/NepheleJobGraphGenerator.java |  32 +++-
 .../flink/compiler/IterationsCompilerTest.java  |  14 +-
 .../flink/compiler/NestedIterationsTest.java    | 181 +++++++++++++++++++
 .../StaticlyNestedIterationsITCase.java         |  86 +++++++++
 7 files changed, 335 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 15aac32..2ce2495 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -849,7 +849,7 @@ public class PactCompiler {
 				// we need to ensure that both the next-workset and the solution-set-delta depend on the workset. One check is for free
 				// during the translation, we do the other check here as a pre-condition
 				{
-					WorksetFinder wsf = new WorksetFinder();
+					StepFunctionValidator wsf = new StepFunctionValidator();
 					iter.getNextWorkset().accept(wsf);
 					if (!wsf.foundWorkset) {
 						throw new CompilerException("In the given program, the next workset does not depend on the workset. This is a prerequisite in delta iterations.");
@@ -943,6 +943,11 @@ public class PactCompiler {
 		@Override
 		public void postVisit(OptimizerNode visitable) {
 			visitable.identifyDynamicPath(this.costWeight);
+			
+			// check that there is no nested iteration on the dynamic path
+			if (visitable.isOnDynamicPath() && visitable instanceof IterationNode) {
+				throw new CompilerException("Nested iterations are currently not supported.");
+			}
 		}
 	}
 	
@@ -1295,7 +1300,7 @@ public class PactCompiler {
 		}
 	}
 	
-	private static final class WorksetFinder implements Visitor<Operator<?>> {
+	private static final class StepFunctionValidator implements Visitor<Operator<?>> {
 
 		private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>();
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
index d3f0fbb..d500925 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
 import java.util.ArrayList;
@@ -60,9 +59,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 	
 	private OptimizerNode nextPartialSolution;
 	
-	private PactConnection rootConnection;
+	private PactConnection rootConnection;		// connection out of the next partial solution
 	
-	private PactConnection terminationCriterionRootConnection;
+	private PactConnection terminationCriterionRootConnection;	// connection out of the term. criterion
 	
 	private OptimizerNode singleRoot;
 	
@@ -130,7 +129,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 	public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) {
 		
 		// check if the root of the step function has the same DOP as the iteration
-		// or if the steo function has any operator at all
+		// or if the step function has any operator at all
 		if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() ||
 			nextPartialSolution == partialSolution || nextPartialSolution instanceof BinaryUnionNode)
 		{
@@ -246,6 +245,14 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		inProps.addLocalProperties(new RequestedLocalProperties());
 		this.inConn.setInterestingProperties(inProps);
 	}
+	
+	@Override
+	public void clearInterestingProperties() {
+		super.clearInterestingProperties();
+		
+		this.singleRoot.accept(InterestingPropertiesClearer.INSTANCE);
+		this.rootConnection.clearInterestingProperties();
+	}
 
 	@Override
 	public void computeUnclosedBranchStack() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
index b6ae34e..0dd23bf 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
@@ -290,6 +290,16 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		this.input1.setInterestingProperties(partitionedIP);
 	}
 	
+	@Override
+	public void clearInterestingProperties() {
+		super.clearInterestingProperties();
+		
+		this.nextWorksetRootConnection.clearInterestingProperties();
+		this.solutionSetDeltaRootConnection.clearInterestingProperties();
+		
+		this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
+		this.solutionSetDelta.accept(InterestingPropertiesClearer.INSTANCE);
+	}
 	
 	@Override
 	protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index d5f9b94..b717924 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -120,7 +120,9 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 	
 	private int iterationIdEnumerator = 1;
 	
-	private IterationPlanNode currentIteration;	// hack: as long as no nesting is possible, remember the enclosing iteration
+	private IterationPlanNode currentIteration; // the current the enclosing iteration
+	
+	private List<IterationPlanNode> iterationStack;  // stack of enclosing iterations
 	
 	private SlotSharingGroup sharingGroup;
 	
@@ -156,12 +158,18 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		this.chainedTasksInSequence = new ArrayList<TaskInChain>();
 		this.auxVertices = new ArrayList<AbstractJobVertex>();
 		this.iterations = new HashMap<IterationPlanNode, IterationDescriptor>();
+		this.iterationStack = new ArrayList<IterationPlanNode>();
 		
 		this.sharingGroup = new SlotSharingGroup();
 		
 		// generate Nephele job graph
 		program.accept(this);
 		
+		// sanity check that we are not somehow in an iteration at the end
+		if (this.currentIteration != null) {
+			throw new CompilerException("The graph translation ended prematurely, leaving an unclosed iteration.");
+		}
+		
 		// finalize the iterations
 		for (IterationDescriptor iteration : this.iterations.values()) {
 			if (iteration.getIterationNode() instanceof BulkIterationPlanNode) {
@@ -207,7 +215,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		this.chainedTasksInSequence = null;
 		this.auxVertices = null;
 		this.iterations = null;
-
+		this.iterationStack = null;
+		
 		// return job graph
 		return graph;
 	}
@@ -391,13 +400,26 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			
 			// check if we have an iteration. in that case, translate the step function now
 			if (node instanceof IterationPlanNode) {
-				// for now, prevent nested iterations
-				if (this.currentIteration != null) {
+				// prevent nested iterations
+				if (node.isOnDynamicPath()) {
 					throw new CompilerException("Nested Iterations are not possible at the moment!");
 				}
+				
+				// if we recursively go into an iteration (because the constant path of one iteration contains
+				// another one), we push the current one onto the stack
+				if (this.currentIteration != null) {
+					this.iterationStack.add(this.currentIteration);
+				}
+				
 				this.currentIteration = (IterationPlanNode) node;
 				this.currentIteration.acceptForStepFunction(this);
-				this.currentIteration = null;
+				
+				// pop the current iteration from the stack
+				if (this.iterationStack.isEmpty()) {
+					this.currentIteration = null;
+				} else {
+					this.currentIteration = this.iterationStack.remove(this.iterationStack.size() - 1);
+				}
 				
 				// inputs for initial bulk partial solution or initial workset are already connected to the iteration head in the head's post visit.
 				// connect the initial solution set now.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 0657172..2be28e4 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.junit.Test;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DeltaIteration;
@@ -40,6 +39,7 @@ import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
 import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
 import org.apache.flink.compiler.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.Collector;
@@ -71,7 +71,11 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			OptimizedPlan p = compileNoStats(env.createProgramPlan());
 			
+			// check that the JSON generator accepts this plan
 			new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
+			
+			// check that the JobGraphGenerator accepts the plan
+			new NepheleJobGraphGenerator().compileJobGraph(p);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -107,6 +111,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
 			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+			
+			new NepheleJobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -141,6 +147,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
 			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+			
+			new NepheleJobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -175,6 +183,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
 			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+			
+			new NepheleJobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -207,6 +217,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
 				assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
 			}
+			
+			new NepheleJobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java
new file mode 100644
index 0000000..88069fd
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class NestedIterationsTest extends CompilerTestBase {
+
+	@Test
+	public void testRejectNestedBulkIterations() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> data = env.generateSequence(1, 100);
+			
+			IterativeDataSet<Long> outerIteration = data.iterate(100);
+			
+			IterativeDataSet<Long> innerIteration = outerIteration.map(new IdentityMapper<Long>()).iterate(100);
+			
+			DataSet<Long> innerResult = innerIteration.closeWith(innerIteration.map(new IdentityMapper<Long>()));
+			
+			DataSet<Long> outerResult = outerIteration.closeWith(innerResult.map(new IdentityMapper<Long>()));
+			
+			outerResult.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			try {
+				compileNoStats(p);
+			}
+			catch (CompilerException e) {
+				assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRejectNestedWorksetIterations() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> outerIteration = data.iterateDelta(data, 100, 0);
+			
+			DataSet<Tuple2<Long, Long>> inOuter = outerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> innerIteration = inOuter.iterateDelta(inOuter, 100, 0);
+			
+			DataSet<Tuple2<Long, Long>> inInner = innerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+			
+			DataSet<Tuple2<Long, Long>> innerResult = innerIteration.closeWith(inInner, inInner).map(new IdentityMapper<Tuple2<Long,Long>>());
+			
+			DataSet<Tuple2<Long, Long>> outerResult = outerIteration.closeWith(innerResult, innerResult);
+			
+			outerResult.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			try {
+				compileNoStats(p);
+			}
+			catch (CompilerException e) {
+				assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testBulkIterationInClosure() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> data1 = env.generateSequence(1, 100);
+			DataSet<Long> data2 = env.generateSequence(1, 100);
+			
+			IterativeDataSet<Long> firstIteration = data1.iterate(100);
+			
+			DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
+			
+			
+			IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
+			
+			DataSet<Long> joined = mainIteration.join(firstResult)
+					.where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
+					.with(new DummyFlatJoinFunction<Long>());
+			
+			DataSet<Long> mainResult = mainIteration.closeWith(joined);
+			
+			mainResult.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			// optimizer should be able to translate this
+			OptimizedPlan op = compileNoStats(p);
+			
+			// job graph generator should be able to translate this
+			new NepheleJobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDeltaIterationInClosure() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> data1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			DataSet<Tuple2<Long, Long>> data2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> firstIteration = data1.iterateDelta(data1, 100, 0);
+			
+			DataSet<Tuple2<Long, Long>> inFirst = firstIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+			
+			DataSet<Tuple2<Long, Long>> firstResult = firstIteration.closeWith(inFirst, inFirst).map(new IdentityMapper<Tuple2<Long,Long>>());
+			
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> mainIteration = data2.iterateDelta(data2, 100, 0);
+			
+			DataSet<Tuple2<Long, Long>> joined = mainIteration.getWorkset().join(firstResult).where(0).equalTo(0)
+							.projectFirst(0).projectSecond(0).types(Long.class, Long.class);
+			
+			DataSet<Tuple2<Long, Long>> mainResult = mainIteration.closeWith(joined, joined);
+			
+			mainResult.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			// optimizer should be able to translate this
+			OptimizedPlan op = compileNoStats(p);
+			
+			// job graph generator should be able to translate this
+			new NepheleJobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/21b1b975/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
new file mode 100644
index 0000000..19fc936
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class StaticlyNestedIterationsITCase extends JavaProgramTestBase {
+
+	
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<Long> data1 = env.generateSequence(1, 100);
+		DataSet<Long> data2 = env.generateSequence(1, 100);
+		
+		IterativeDataSet<Long> firstIteration = data1.iterate(100);
+		
+		DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdMapper()));
+		
+		
+		IterativeDataSet<Long> mainIteration = data2.map(new IdMapper()).iterate(100);
+		
+		DataSet<Long> joined = mainIteration.join(firstResult)
+				.where(new IdKeyExtractor()).equalTo(new IdKeyExtractor())
+				.with(new Joiner());
+		
+		DataSet<Long> mainResult = mainIteration.closeWith(joined);
+		
+		mainResult.print();
+		
+		env.execute();
+	}
+	
+	private static class IdKeyExtractor implements KeySelector<Long, Long> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Long getKey(Long value) {
+			return value;
+		}
+	}
+	
+	private static class IdMapper implements MapFunction<Long, Long> {
+		
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public Long map(Long value) {
+			return value;
+		}
+	}
+	
+	private static class Joiner implements JoinFunction<Long, Long, Long> {
+		
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public Long join(Long first, Long second) {
+			return first;
+		}
+	}
+}