You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/17 10:16:25 UTC

[5/8] flink git commit: [FLINK-1671] [optimizer] Add data exchange mode to optimizer classes

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeForwardTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeForwardTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeForwardTest.java
new file mode 100644
index 0000000..e550749
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeForwardTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.compiler.testfunctions.Top1GroupReducer;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test verifies that the optimizer assigns the correct
+ * data exchange mode to a simple forward / shuffle plan.
+ *
+ * <pre>
+ *     (source) -> (map) -> (filter) -> (groupBy / reduce)
+ * </pre>
+ */
+@SuppressWarnings("serial")
+public class DataExchangeModeForwardTest extends CompilerTestBase {
+
+
+	@Test
+	public void testPipelinedForced() {
+		// PIPELINED_FORCED should result in pipelining all the way
+		verifySimpleForwardPlan(ExecutionMode.PIPELINED_FORCED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testPipelined() {
+		// PIPELINED should result in pipelining all the way
+		verifySimpleForwardPlan(ExecutionMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testBatch() {
+		// BATCH should result in batching the shuffle all the way
+		verifySimpleForwardPlan(ExecutionMode.BATCH,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.BATCH, DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testBatchForced() {
+		// BATCH_FORCED should result in batching all the way
+		verifySimpleForwardPlan(ExecutionMode.BATCH_FORCED,
+				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+				DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
+				DataExchangeMode.BATCH, DataExchangeMode.BATCH);
+	}
+
+	private void verifySimpleForwardPlan(ExecutionMode execMode,
+										DataExchangeMode toMap,
+										DataExchangeMode toFilter,
+										DataExchangeMode toKeyExtractor,
+										DataExchangeMode toCombiner,
+										DataExchangeMode toReduce,
+										DataExchangeMode toSink)
+	{
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.getConfig().setExecutionMode(execMode);
+
+			DataSet<String> dataSet = env.readTextFile("/never/accessed");
+			dataSet
+				.map(new MapFunction<String, Integer>() {
+					@Override
+					public Integer map(String value) {
+						return 0;
+					}
+				})
+				.filter(new FilterFunction<Integer>() {
+					@Override
+					public boolean filter(Integer value) {
+						return false;
+					}
+				})
+				.groupBy(new IdentityKeyExtractor<Integer>())
+				.reduceGroup(new Top1GroupReducer<Integer>())
+				.output(new DiscardingOutputFormat<Integer>());
+
+			OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
+			SinkPlanNode sinkNode = optPlan.getDataSinks().iterator().next();
+
+			SingleInputPlanNode reduceNode = (SingleInputPlanNode) sinkNode.getPredecessor();
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getPredecessor();
+			SingleInputPlanNode keyExtractorNode = (SingleInputPlanNode) combineNode.getPredecessor();
+
+			SingleInputPlanNode filterNode = (SingleInputPlanNode) keyExtractorNode.getPredecessor();
+			SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
+
+			assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
+			assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
+			assertEquals(toKeyExtractor, keyExtractorNode.getInput().getDataExchangeMode());
+			assertEquals(toCombiner, combineNode.getInput().getDataExchangeMode());
+			assertEquals(toReduce, reduceNode.getInput().getDataExchangeMode());
+			assertEquals(toSink, sinkNode.getInput().getDataExchangeMode());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeOpenBranchingTest.java
new file mode 100644
index 0000000..e4e0cba
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeOpenBranchingTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test checks the correct assignment of the DataExchangeMode to
+ * connections for programs that branch, but do not re-join the branches.
+ *
+ * <pre>
+ *                      /---> (filter) -> (sink)
+ *                     /
+ *                    /
+ * (source) -> (map) -----------------\
+ *                    \               (join) -> (sink)
+ *                     \   (source) --/
+ *                      \
+ *                       \
+ *                        \-> (sink)
+ * </pre>
+ */
+@SuppressWarnings({"serial", "unchecked"})
+public class DataExchangeModeOpenBranchingTest extends CompilerTestBase {
+
+	@Test
+	public void testPipelinedForced() {
+		// PIPELINED_FORCED should result in pipelining all the way
+		verifyBranchigPlan(ExecutionMode.PIPELINED_FORCED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testPipelined() {
+		// PIPELINED should result in pipelining all the way
+		verifyBranchigPlan(ExecutionMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testBatch() {
+		// BATCH should result in batching the shuffle all the way
+		verifyBranchigPlan(ExecutionMode.BATCH,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.BATCH,
+				DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testBatchForced() {
+		// BATCH_FORCED should result in batching all the way
+		verifyBranchigPlan(ExecutionMode.BATCH_FORCED,
+				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+				DataExchangeMode.BATCH);
+	}
+
+	private void verifyBranchigPlan(ExecutionMode execMode,
+									DataExchangeMode toMap,
+									DataExchangeMode toFilter,
+									DataExchangeMode toFilterSink,
+									DataExchangeMode toJoin1,
+									DataExchangeMode toJoin2,
+									DataExchangeMode toJoinSink,
+									DataExchangeMode toDirectSink)
+	{
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.getConfig().setExecutionMode(execMode);
+
+			DataSet<Tuple2<Long, Long>> data = env.generateSequence(1, 100000)
+					.map(new MapFunction<Long, Tuple2<Long, Long>>() {
+						@Override
+						public Tuple2<Long, Long> map(Long value) {
+							return new Tuple2<Long, Long>(value, value);
+						}
+					});
+
+			// output 1
+			data
+					.filter(new FilterFunction<Tuple2<Long, Long>>() {
+						@Override
+						public boolean filter(Tuple2<Long, Long> value) {
+							return false;
+						}
+					})
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink1");
+
+			// output 2 does a join before a join
+			data
+					.join(env.fromElements(new Tuple2<Long, Long>(1L, 2L)))
+					.where(1)
+					.equalTo(0)
+					.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()).name("sink2");
+
+			// output 3 is direct
+			data
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink3");
+
+			OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
+
+			SinkPlanNode filterSink = findSink(optPlan.getDataSinks(), "sink1");
+			SinkPlanNode joinSink = findSink(optPlan.getDataSinks(), "sink2");
+			SinkPlanNode directSink = findSink(optPlan.getDataSinks(), "sink3");
+
+			SingleInputPlanNode filterNode = (SingleInputPlanNode) filterSink.getPredecessor();
+			SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
+
+			DualInputPlanNode joinNode = (DualInputPlanNode) joinSink.getPredecessor();
+			assertEquals(mapNode, joinNode.getInput1().getSource());
+
+			assertEquals(mapNode, directSink.getPredecessor());
+
+			assertEquals(toFilterSink, filterSink.getInput().getDataExchangeMode());
+			assertEquals(toJoinSink, joinSink.getInput().getDataExchangeMode());
+			assertEquals(toDirectSink, directSink.getInput().getDataExchangeMode());
+
+			assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
+			assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
+
+			assertEquals(toJoin1, joinNode.getInput1().getDataExchangeMode());
+			assertEquals(toJoin2, joinNode.getInput2().getDataExchangeMode());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) {
+		for (SinkPlanNode node : collection) {
+			String nodeName = node.getOptimizerNode().getPactContract().getName();
+			if (nodeName != null && nodeName.equals(name)) {
+				return node;
+			}
+		}
+
+		throw new IllegalArgumentException("No node with that name was found.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/PipelineBreakingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/PipelineBreakingTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/PipelineBreakingTest.java
new file mode 100644
index 0000000..f6a77c6
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/PipelineBreakingTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataexchange;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.PactCompiler;
+import org.apache.flink.compiler.dag.DataSinkNode;
+import org.apache.flink.compiler.dag.OptimizerNode;
+import org.apache.flink.compiler.dag.SingleInputNode;
+import org.apache.flink.compiler.dag.SinkJoiner;
+import org.apache.flink.compiler.dag.TwoInputNode;
+import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.compiler.testfunctions.IdentityFlatMapper;
+import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.compiler.testfunctions.SelectOneReducer;
+import org.apache.flink.compiler.testfunctions.Top1GroupReducer;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * This test checks whether connections are correctly marked as pipelined breaking.
+ */
+@SuppressWarnings("serial")
+public class PipelineBreakingTest {
+
+	/**
+	 * Tests that no pipeline breakers are inserted into a simple forward
+	 * pipeline.
+	 *
+	 * <pre>
+	 *     (source) -> (map) -> (filter) -> (groupBy / reduce)
+	 * </pre>
+	 */
+	@Test
+	public void testSimpleForwardPlan() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<String> dataSet = env.readTextFile("/never/accessed");
+			dataSet
+				.map(new MapFunction<String, Integer>() {
+					@Override
+					public Integer map(String value) {
+						return 0;
+					}
+				})
+				.filter(new FilterFunction<Integer>() {
+					@Override
+					public boolean filter(Integer value) {
+						return false;
+					}
+				})
+				.groupBy(new IdentityKeyExtractor<Integer>())
+				.reduceGroup(new Top1GroupReducer<Integer>())
+				.output(new DiscardingOutputFormat<Integer>());
+
+			DataSinkNode sinkNode = convertPlan(env.createProgramPlan()).get(0);
+
+			SingleInputNode reduceNode = (SingleInputNode) sinkNode.getPredecessorNode();
+			SingleInputNode keyExtractorNode = (SingleInputNode) reduceNode.getPredecessorNode();
+
+			SingleInputNode filterNode = (SingleInputNode) keyExtractorNode.getPredecessorNode();
+			SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
+
+			assertFalse(sinkNode.getInputConnection().isBreakingPipeline());
+			assertFalse(reduceNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(keyExtractorNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests that branching plans, where the branches are not re-joined,
+	 * do not place pipeline breakers.
+	 * 
+	 * <pre>
+	 *                      /---> (filter) -> (sink)
+	 *                     /
+	 *                    /
+	 * (source) -> (map) -----------------\
+	 *                    \               (join) -> (sink)
+	 *                     \   (source) --/
+	 *                      \
+	 *                       \
+	 *                        \-> (sink)
+	 * </pre>
+	 */
+	@Test
+	public void testBranchingPlanNotReJoined() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Integer> data = env.readTextFile("/never/accessed")
+				.map(new MapFunction<String, Integer>() {
+					@Override
+					public Integer map(String value) {
+						return 0;
+					}
+				});
+
+			// output 1
+			data
+				.filter(new FilterFunction<Integer>() {
+					@Override
+					public boolean filter(Integer value) {
+						return false;
+					}
+				})
+				.output(new DiscardingOutputFormat<Integer>());
+
+			// output 2 does a join before a join
+			data
+				.join(env.fromElements(1, 2, 3, 4))
+					.where(new IdentityKeyExtractor<Integer>())
+					.equalTo(new IdentityKeyExtractor<Integer>())
+				.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
+
+			// output 3 is direct
+			data
+				.output(new DiscardingOutputFormat<Integer>());
+
+			List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
+
+			// gather the optimizer DAG nodes
+
+			DataSinkNode sinkAfterFilter = sinks.get(0);
+			DataSinkNode sinkAfterJoin = sinks.get(1);
+			DataSinkNode sinkDirect = sinks.get(2);
+
+			SingleInputNode filterNode = (SingleInputNode) sinkAfterFilter.getPredecessorNode();
+			SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
+
+			TwoInputNode joinNode = (TwoInputNode) sinkAfterJoin.getPredecessorNode();
+			SingleInputNode joinInput = (SingleInputNode) joinNode.getSecondPredecessorNode();
+
+			// verify the non-pipeline breaking status
+
+			assertFalse(sinkAfterFilter.getInputConnection().isBreakingPipeline());
+			assertFalse(sinkAfterJoin.getInputConnection().isBreakingPipeline());
+			assertFalse(sinkDirect.getInputConnection().isBreakingPipeline());
+
+			assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+
+			assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
+			assertFalse(joinNode.getSecondIncomingConnection().isBreakingPipeline());
+			assertFalse(joinInput.getIncomingConnection().isBreakingPipeline());
+
+			// some other sanity checks on the plan construction (cannot hurt)
+
+			assertEquals(mapNode, ((SingleInputNode) joinNode.getFirstPredecessorNode()).getPredecessorNode());
+			assertEquals(mapNode, sinkDirect.getPredecessorNode());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests that branches that are re-joined have place pipeline breakers.
+	 * 
+	 * <pre>
+	 *                                         /-> (sink)
+	 *                                        /
+	 *                         /-> (reduce) -+          /-> (flatmap) -> (sink)
+	 *                        /               \        /
+	 *     (source) -> (map) -                (join) -+-----\
+	 *                        \               /              \
+	 *                         \-> (filter) -+                \
+	 *                                       \                (co group) -> (sink)
+	 *                                        \                /
+	 *                                         \-> (reduce) - /
+	 * </pre>
+	 */
+	@Test
+	public void testReJoinedBranches() {
+		try {
+			// build a test program
+
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Tuple2<Long, Long>> data = env.fromElements(33L, 44L)
+					.map(new MapFunction<Long, Tuple2<Long, Long>>() {
+						@Override
+						public Tuple2<Long, Long> map(Long value) {
+							return new Tuple2<Long, Long>(value, value);
+						}
+					});
+
+			DataSet<Tuple2<Long, Long>> reduced = data.groupBy(0).reduce(new SelectOneReducer<Tuple2<Long, Long>>());
+			reduced.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			
+			DataSet<Tuple2<Long, Long>> filtered = data.filter(new FilterFunction<Tuple2<Long, Long>>() {
+				@Override
+				public boolean filter(Tuple2<Long, Long> value) throws Exception {
+					return false;
+				}
+			});
+			
+			DataSet<Tuple2<Long, Long>> joined = reduced.join(filtered)
+					.where(1).equalTo(1)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+			
+			joined.flatMap(new IdentityFlatMapper<Tuple2<Long, Long>>())
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+			joined.coGroup(filtered.groupBy(1).reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>()))
+					.where(0).equalTo(0)
+					.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+					.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+			List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
+
+			// gather the optimizer DAG nodes
+
+			DataSinkNode sinkAfterReduce = sinks.get(0);
+			DataSinkNode sinkAfterFlatMap = sinks.get(1);
+			DataSinkNode sinkAfterCoGroup = sinks.get(2);
+
+			SingleInputNode reduceNode = (SingleInputNode) sinkAfterReduce.getPredecessorNode();
+			SingleInputNode mapNode = (SingleInputNode) reduceNode.getPredecessorNode();
+
+			SingleInputNode flatMapNode = (SingleInputNode) sinkAfterFlatMap.getPredecessorNode();
+			TwoInputNode joinNode = (TwoInputNode) flatMapNode.getPredecessorNode();
+			SingleInputNode filterNode = (SingleInputNode) joinNode.getSecondPredecessorNode();
+
+			TwoInputNode coGroupNode = (TwoInputNode) sinkAfterCoGroup.getPredecessorNode();
+			SingleInputNode otherReduceNode = (SingleInputNode) coGroupNode.getSecondPredecessorNode();
+
+			// test sanity checks (that we constructed the DAG correctly)
+
+			assertEquals(reduceNode, joinNode.getFirstPredecessorNode());
+			assertEquals(mapNode, filterNode.getPredecessorNode());
+			assertEquals(joinNode, coGroupNode.getFirstPredecessorNode());
+			assertEquals(filterNode, otherReduceNode.getPredecessorNode());
+
+			// verify the pipeline breaking status
+
+			assertFalse(sinkAfterReduce.getInputConnection().isBreakingPipeline());
+			assertFalse(sinkAfterFlatMap.getInputConnection().isBreakingPipeline());
+			assertFalse(sinkAfterCoGroup.getInputConnection().isBreakingPipeline());
+
+			assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(flatMapNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
+			assertFalse(coGroupNode.getFirstIncomingConnection().isBreakingPipeline());
+			assertFalse(coGroupNode.getSecondIncomingConnection().isBreakingPipeline());
+
+			// these should be pipeline breakers
+			assertTrue(reduceNode.getIncomingConnection().isBreakingPipeline());
+			assertTrue(filterNode.getIncomingConnection().isBreakingPipeline());
+			assertTrue(otherReduceNode.getIncomingConnection().isBreakingPipeline());
+			assertTrue(joinNode.getSecondIncomingConnection().isBreakingPipeline());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private static List<DataSinkNode> convertPlan(Plan p) {
+		PactCompiler.GraphCreatingVisitor dagCreator =
+				new PactCompiler.GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode());
+
+		// create the DAG
+		p.accept(dagCreator);
+		List<DataSinkNode> sinks = dagCreator.getSinks();
+
+		// build a single root and run the branch tracking logic
+		OptimizerNode rootNode;
+		if (sinks.size() == 1) {
+			rootNode = sinks.get(0);
+		}
+		else {
+			Iterator<DataSinkNode> iter = sinks.iterator();
+			rootNode = iter.next();
+
+			while (iter.hasNext()) {
+				rootNode = new SinkJoiner(rootNode, iter.next());
+			}
+		}
+		rootNode.accept(new PactCompiler.IdAndEstimatesVisitor(null));
+		rootNode.accept(new PactCompiler.BranchesVisitor());
+
+		return sinks;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityFlatMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityFlatMapper.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityFlatMapper.java
new file mode 100644
index 0000000..5fe32b4
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityFlatMapper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.testfunctions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+public class IdentityFlatMapper<T> implements FlatMapFunction<T, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void flatMap(T value, Collector<T> out) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
index e99cac7..f75b797 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators;
 
 import java.util.ArrayList;
@@ -26,9 +25,10 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.types.Key;
 
 /**
- *
+ * This class represents an ordering on a set of fields. It specifies the fields and order direction
+ * (ascending, descending).
  */
-public class Ordering {
+public class Ordering implements Cloneable {
 	
 	protected FieldList indexes = new FieldList();
 	
@@ -212,9 +212,6 @@ public class Ordering {
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	
-	
-	
 
 	public Ordering clone() {
 		Ordering newOrdering = new Ordering();
@@ -223,7 +220,6 @@ public class Ordering {
 		newOrdering.orders.addAll(this.orders);
 		return newOrdering;
 	}
-	
 
 	@Override
 	public int hashCode() {
@@ -235,7 +231,6 @@ public class Ordering {
 		return result;
 	}
 
-
 	@Override
 	public boolean equals(Object obj) {
 		if (this == obj) {
@@ -273,7 +268,7 @@ public class Ordering {
 	}
 
 	public String toString() {
-		final StringBuffer buf = new StringBuffer("[");
+		final StringBuilder buf = new StringBuilder("[");
 		for (int i = 0; i < indexes.size(); i++) {
 			if (i != 0) {
 				buf.append(",");

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java
new file mode 100644
index 0000000..7810c3e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+/**
+ * Defines how the data exchange between two specific operators happens.
+ */
+public enum DataExchangeMode {
+
+	/**
+	 * The data exchange is streamed, sender and receiver are online at the same time,
+	 * and the receiver back-pressures the sender.
+	 */
+	PIPELINED,
+
+	/**
+	 * The data exchange is decoupled. The sender first produces its entire result and finishes.
+	 * After that, the receiver is stated and may consume the data.
+	 */
+	BATCH,
+
+	/**
+	 * The data exchange starts like ine {@link #PIPELINED} and falls back to {@link #BATCH}
+	 * for recovery runs.
+	 */
+	PIPELINE_WITH_BATCH_FALLBACK;
+
+	// ------------------------------------------------------------------------
+
+	public static DataExchangeMode getForForwardExchange(ExecutionMode mode) {
+		return FORWARD[mode.ordinal()];
+	}
+
+	public static DataExchangeMode getForShuffleOrBroadcast(ExecutionMode mode) {
+		return SHUFFLE[mode.ordinal()];
+	}
+
+	public static DataExchangeMode getPipelineBreakingExchange(ExecutionMode mode) {
+		return BREAKING[mode.ordinal()];
+	}
+
+	/**
+	 * Computes the mode of data exchange to be used for a, given an execution mode and ship strategy.
+	 * The type of the data exchange depends also on whether this connection has been identified to require
+	 * pipeline breaking for deadlock avoidance.
+	 * <ul>
+	 *     <li>If the connection is set to be pipeline breaking, this returns the pipeline breaking variant
+	 *         of the execution mode
+	 *         {@link org.apache.flink.runtime.io.network.DataExchangeMode#getPipelineBreakingExchange(org.apache.flink.api.common.ExecutionMode)}.
+	 *     </li>
+	 *     <li>If the data exchange is a simple FORWARD (one-to-one communication), this returns
+	 *         {@link org.apache.flink.runtime.io.network.DataExchangeMode#getForForwardExchange(org.apache.flink.api.common.ExecutionMode)}.
+	 *     </li>
+	 *     <li>If otherwise, this returns
+	 *         {@link org.apache.flink.runtime.io.network.DataExchangeMode#getForShuffleOrBroadcast(org.apache.flink.api.common.ExecutionMode)}.
+	 *     </li>
+	 * </ul>
+	 *
+	 * @param shipStrategy The ship strategy (FORWARD, PARTITION, BROADCAST, ...) of the runtime data exchange.
+	 * @return The data exchange mode for the connection, given the concrete ship strategy.
+	 */
+	public static DataExchangeMode select(ExecutionMode executionMode, ShipStrategyType shipStrategy,
+													boolean breakPipeline) {
+
+		if (shipStrategy == null || shipStrategy == ShipStrategyType.NONE) {
+			throw new IllegalArgumentException("shipStrategy may not be null or NONE");
+		}
+		if (executionMode == null) {
+			throw new IllegalArgumentException("executionMode may not mbe null");
+		}
+
+		if (breakPipeline) {
+			return getPipelineBreakingExchange(executionMode);
+		}
+		else if (shipStrategy == ShipStrategyType.FORWARD) {
+			return getForForwardExchange(executionMode);
+		}
+		else {
+			return getForShuffleOrBroadcast(executionMode);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final DataExchangeMode[] FORWARD = new DataExchangeMode[ExecutionMode.values().length];
+
+	private static final DataExchangeMode[] SHUFFLE = new DataExchangeMode[ExecutionMode.values().length];
+
+	private static final DataExchangeMode[] BREAKING = new DataExchangeMode[ExecutionMode.values().length];
+
+	// initialize the map between execution modes and exchange modes in
+	static {
+		FORWARD[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
+		SHUFFLE[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
+		BREAKING[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
+
+		FORWARD[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
+		SHUFFLE[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
+		BREAKING[ExecutionMode.PIPELINED.ordinal()] = BATCH;
+
+		FORWARD[ExecutionMode.BATCH.ordinal()] = PIPELINED;
+		SHUFFLE[ExecutionMode.BATCH.ordinal()] = BATCH;
+		BREAKING[ExecutionMode.BATCH.ordinal()] = BATCH;
+
+		FORWARD[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
+		SHUFFLE[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
+		BREAKING[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java
new file mode 100644
index 0000000..cae80e8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * This test verifies that the data exchange modes are defined for every execution mode.
+ */
+public class DataExchangeModeTest {
+
+	@Test
+	public void testForward() {
+		for (ExecutionMode mode : ExecutionMode.values()) {
+			assertNotNull(DataExchangeMode.getForForwardExchange(mode));
+		}
+	}
+
+	@Test
+	public void testShuffleAndBroadcast() {
+		for (ExecutionMode mode : ExecutionMode.values()) {
+			assertNotNull(DataExchangeMode.getForShuffleOrBroadcast(mode));
+		}
+	}
+
+	@Test
+	public void testPipelineBreaking() {
+		for (ExecutionMode mode : ExecutionMode.values()) {
+			assertNotNull(DataExchangeMode.getPipelineBreakingExchange(mode));
+		}
+	}
+}