You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/17 10:16:25 UTC
[5/8] flink git commit: [FLINK-1671] [optimizer] Add data exchange
mode to optimizer classes
http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeForwardTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeForwardTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeForwardTest.java
new file mode 100644
index 0000000..e550749
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeForwardTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.compiler.testfunctions.Top1GroupReducer;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test verifies that the optimizer assigns the correct
+ * data exchange mode to a simple forward / shuffle plan.
+ *
+ * <pre>
+ * (source) -> (map) -> (filter) -> (groupBy / reduce)
+ * </pre>
+ */
+@SuppressWarnings("serial")
+public class DataExchangeModeForwardTest extends CompilerTestBase {
+
+
+ @Test
+ public void testPipelinedForced() {
+ // PIPELINED_FORCED should result in pipelining all the way
+ verifySimpleForwardPlan(ExecutionMode.PIPELINED_FORCED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testPipelined() {
+ // PIPELINED should result in pipelining all the way
+ verifySimpleForwardPlan(ExecutionMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testBatch() {
+ // BATCH should result in batching the shuffle all the way
+ verifySimpleForwardPlan(ExecutionMode.BATCH,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.BATCH, DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testBatchForced() {
+ // BATCH_FORCED should result in batching all the way
+ verifySimpleForwardPlan(ExecutionMode.BATCH_FORCED,
+ DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+ DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
+ DataExchangeMode.BATCH, DataExchangeMode.BATCH);
+ }
+
+ private void verifySimpleForwardPlan(ExecutionMode execMode,
+ DataExchangeMode toMap,
+ DataExchangeMode toFilter,
+ DataExchangeMode toKeyExtractor,
+ DataExchangeMode toCombiner,
+ DataExchangeMode toReduce,
+ DataExchangeMode toSink)
+ {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setExecutionMode(execMode);
+
+ DataSet<String> dataSet = env.readTextFile("/never/accessed");
+ dataSet
+ .map(new MapFunction<String, Integer>() {
+ @Override
+ public Integer map(String value) {
+ return 0;
+ }
+ })
+ .filter(new FilterFunction<Integer>() {
+ @Override
+ public boolean filter(Integer value) {
+ return false;
+ }
+ })
+ .groupBy(new IdentityKeyExtractor<Integer>())
+ .reduceGroup(new Top1GroupReducer<Integer>())
+ .output(new DiscardingOutputFormat<Integer>());
+
+ OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
+ SinkPlanNode sinkNode = optPlan.getDataSinks().iterator().next();
+
+ SingleInputPlanNode reduceNode = (SingleInputPlanNode) sinkNode.getPredecessor();
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getPredecessor();
+ SingleInputPlanNode keyExtractorNode = (SingleInputPlanNode) combineNode.getPredecessor();
+
+ SingleInputPlanNode filterNode = (SingleInputPlanNode) keyExtractorNode.getPredecessor();
+ SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
+
+ assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
+ assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
+ assertEquals(toKeyExtractor, keyExtractorNode.getInput().getDataExchangeMode());
+ assertEquals(toCombiner, combineNode.getInput().getDataExchangeMode());
+ assertEquals(toReduce, reduceNode.getInput().getDataExchangeMode());
+ assertEquals(toSink, sinkNode.getInput().getDataExchangeMode());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeOpenBranchingTest.java
new file mode 100644
index 0000000..e4e0cba
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeOpenBranchingTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test checks the correct assignment of the DataExchangeMode to
+ * connections for programs that branch, but do not re-join the branches.
+ *
+ * <pre>
+ * /---> (filter) -> (sink)
+ * /
+ * /
+ * (source) -> (map) -----------------\
+ * \ (join) -> (sink)
+ * \ (source) --/
+ * \
+ * \
+ * \-> (sink)
+ * </pre>
+ */
+@SuppressWarnings({"serial", "unchecked"})
+public class DataExchangeModeOpenBranchingTest extends CompilerTestBase {
+
+ @Test
+ public void testPipelinedForced() {
+ // PIPELINED_FORCED should result in pipelining all the way
+ verifyBranchigPlan(ExecutionMode.PIPELINED_FORCED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testPipelined() {
+ // PIPELINED should result in pipelining all the way
+ verifyBranchigPlan(ExecutionMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testBatch() {
+ // BATCH should result in batching the shuffle all the way
+ verifyBranchigPlan(ExecutionMode.BATCH,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.BATCH,
+ DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testBatchForced() {
+ // BATCH_FORCED should result in batching all the way
+ verifyBranchigPlan(ExecutionMode.BATCH_FORCED,
+ DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+ DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+ DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+ DataExchangeMode.BATCH);
+ }
+
+ private void verifyBranchigPlan(ExecutionMode execMode,
+ DataExchangeMode toMap,
+ DataExchangeMode toFilter,
+ DataExchangeMode toFilterSink,
+ DataExchangeMode toJoin1,
+ DataExchangeMode toJoin2,
+ DataExchangeMode toJoinSink,
+ DataExchangeMode toDirectSink)
+ {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setExecutionMode(execMode);
+
+ DataSet<Tuple2<Long, Long>> data = env.generateSequence(1, 100000)
+ .map(new MapFunction<Long, Tuple2<Long, Long>>() {
+ @Override
+ public Tuple2<Long, Long> map(Long value) {
+ return new Tuple2<Long, Long>(value, value);
+ }
+ });
+
+ // output 1
+ data
+ .filter(new FilterFunction<Tuple2<Long, Long>>() {
+ @Override
+ public boolean filter(Tuple2<Long, Long> value) {
+ return false;
+ }
+ })
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink1");
+
+ // output 2 does a join before a join
+ data
+ .join(env.fromElements(new Tuple2<Long, Long>(1L, 2L)))
+ .where(1)
+ .equalTo(0)
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()).name("sink2");
+
+ // output 3 is direct
+ data
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink3");
+
+ OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
+
+ SinkPlanNode filterSink = findSink(optPlan.getDataSinks(), "sink1");
+ SinkPlanNode joinSink = findSink(optPlan.getDataSinks(), "sink2");
+ SinkPlanNode directSink = findSink(optPlan.getDataSinks(), "sink3");
+
+ SingleInputPlanNode filterNode = (SingleInputPlanNode) filterSink.getPredecessor();
+ SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
+
+ DualInputPlanNode joinNode = (DualInputPlanNode) joinSink.getPredecessor();
+ assertEquals(mapNode, joinNode.getInput1().getSource());
+
+ assertEquals(mapNode, directSink.getPredecessor());
+
+ assertEquals(toFilterSink, filterSink.getInput().getDataExchangeMode());
+ assertEquals(toJoinSink, joinSink.getInput().getDataExchangeMode());
+ assertEquals(toDirectSink, directSink.getInput().getDataExchangeMode());
+
+ assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
+ assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
+
+ assertEquals(toJoin1, joinNode.getInput1().getDataExchangeMode());
+ assertEquals(toJoin2, joinNode.getInput2().getDataExchangeMode());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) {
+ for (SinkPlanNode node : collection) {
+ String nodeName = node.getOptimizerNode().getPactContract().getName();
+ if (nodeName != null && nodeName.equals(name)) {
+ return node;
+ }
+ }
+
+ throw new IllegalArgumentException("No node with that name was found.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/PipelineBreakingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/PipelineBreakingTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/PipelineBreakingTest.java
new file mode 100644
index 0000000..f6a77c6
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/PipelineBreakingTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataexchange;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.PactCompiler;
+import org.apache.flink.compiler.dag.DataSinkNode;
+import org.apache.flink.compiler.dag.OptimizerNode;
+import org.apache.flink.compiler.dag.SingleInputNode;
+import org.apache.flink.compiler.dag.SinkJoiner;
+import org.apache.flink.compiler.dag.TwoInputNode;
+import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.compiler.testfunctions.IdentityFlatMapper;
+import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.compiler.testfunctions.SelectOneReducer;
+import org.apache.flink.compiler.testfunctions.Top1GroupReducer;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * This test checks whether connections are correctly marked as pipelined breaking.
+ */
+@SuppressWarnings("serial")
+public class PipelineBreakingTest {
+
+ /**
+ * Tests that no pipeline breakers are inserted into a simple forward
+ * pipeline.
+ *
+ * <pre>
+ * (source) -> (map) -> (filter) -> (groupBy / reduce)
+ * </pre>
+ */
+ @Test
+ public void testSimpleForwardPlan() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> dataSet = env.readTextFile("/never/accessed");
+ dataSet
+ .map(new MapFunction<String, Integer>() {
+ @Override
+ public Integer map(String value) {
+ return 0;
+ }
+ })
+ .filter(new FilterFunction<Integer>() {
+ @Override
+ public boolean filter(Integer value) {
+ return false;
+ }
+ })
+ .groupBy(new IdentityKeyExtractor<Integer>())
+ .reduceGroup(new Top1GroupReducer<Integer>())
+ .output(new DiscardingOutputFormat<Integer>());
+
+ DataSinkNode sinkNode = convertPlan(env.createProgramPlan()).get(0);
+
+ SingleInputNode reduceNode = (SingleInputNode) sinkNode.getPredecessorNode();
+ SingleInputNode keyExtractorNode = (SingleInputNode) reduceNode.getPredecessorNode();
+
+ SingleInputNode filterNode = (SingleInputNode) keyExtractorNode.getPredecessorNode();
+ SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
+
+ assertFalse(sinkNode.getInputConnection().isBreakingPipeline());
+ assertFalse(reduceNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(keyExtractorNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Tests that branching plans, where the branches are not re-joined,
+ * do not place pipeline breakers.
+ *
+ * <pre>
+ * /---> (filter) -> (sink)
+ * /
+ * /
+ * (source) -> (map) -----------------\
+ * \ (join) -> (sink)
+ * \ (source) --/
+ * \
+ * \
+ * \-> (sink)
+ * </pre>
+ */
+ @Test
+ public void testBranchingPlanNotReJoined() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> data = env.readTextFile("/never/accessed")
+ .map(new MapFunction<String, Integer>() {
+ @Override
+ public Integer map(String value) {
+ return 0;
+ }
+ });
+
+ // output 1
+ data
+ .filter(new FilterFunction<Integer>() {
+ @Override
+ public boolean filter(Integer value) {
+ return false;
+ }
+ })
+ .output(new DiscardingOutputFormat<Integer>());
+
+ // output 2 does a join before a join
+ data
+ .join(env.fromElements(1, 2, 3, 4))
+ .where(new IdentityKeyExtractor<Integer>())
+ .equalTo(new IdentityKeyExtractor<Integer>())
+ .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
+
+ // output 3 is direct
+ data
+ .output(new DiscardingOutputFormat<Integer>());
+
+ List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
+
+ // gather the optimizer DAG nodes
+
+ DataSinkNode sinkAfterFilter = sinks.get(0);
+ DataSinkNode sinkAfterJoin = sinks.get(1);
+ DataSinkNode sinkDirect = sinks.get(2);
+
+ SingleInputNode filterNode = (SingleInputNode) sinkAfterFilter.getPredecessorNode();
+ SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
+
+ TwoInputNode joinNode = (TwoInputNode) sinkAfterJoin.getPredecessorNode();
+ SingleInputNode joinInput = (SingleInputNode) joinNode.getSecondPredecessorNode();
+
+ // verify the non-pipeline breaking status
+
+ assertFalse(sinkAfterFilter.getInputConnection().isBreakingPipeline());
+ assertFalse(sinkAfterJoin.getInputConnection().isBreakingPipeline());
+ assertFalse(sinkDirect.getInputConnection().isBreakingPipeline());
+
+ assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+
+ assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
+ assertFalse(joinNode.getSecondIncomingConnection().isBreakingPipeline());
+ assertFalse(joinInput.getIncomingConnection().isBreakingPipeline());
+
+ // some other sanity checks on the plan construction (cannot hurt)
+
+ assertEquals(mapNode, ((SingleInputNode) joinNode.getFirstPredecessorNode()).getPredecessorNode());
+ assertEquals(mapNode, sinkDirect.getPredecessorNode());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Tests that branches that are re-joined have place pipeline breakers.
+ *
+ * <pre>
+ * /-> (sink)
+ * /
+ * /-> (reduce) -+ /-> (flatmap) -> (sink)
+ * / \ /
+ * (source) -> (map) - (join) -+-----\
+ * \ / \
+ * \-> (filter) -+ \
+ * \ (co group) -> (sink)
+ * \ /
+ * \-> (reduce) - /
+ * </pre>
+ */
+ @Test
+ public void testReJoinedBranches() {
+ try {
+ // build a test program
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> data = env.fromElements(33L, 44L)
+ .map(new MapFunction<Long, Tuple2<Long, Long>>() {
+ @Override
+ public Tuple2<Long, Long> map(Long value) {
+ return new Tuple2<Long, Long>(value, value);
+ }
+ });
+
+ DataSet<Tuple2<Long, Long>> reduced = data.groupBy(0).reduce(new SelectOneReducer<Tuple2<Long, Long>>());
+ reduced.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> filtered = data.filter(new FilterFunction<Tuple2<Long, Long>>() {
+ @Override
+ public boolean filter(Tuple2<Long, Long> value) throws Exception {
+ return false;
+ }
+ });
+
+ DataSet<Tuple2<Long, Long>> joined = reduced.join(filtered)
+ .where(1).equalTo(1)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ joined.flatMap(new IdentityFlatMapper<Tuple2<Long, Long>>())
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+ joined.coGroup(filtered.groupBy(1).reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>()))
+ .where(0).equalTo(0)
+ .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+ List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
+
+ // gather the optimizer DAG nodes
+
+ DataSinkNode sinkAfterReduce = sinks.get(0);
+ DataSinkNode sinkAfterFlatMap = sinks.get(1);
+ DataSinkNode sinkAfterCoGroup = sinks.get(2);
+
+ SingleInputNode reduceNode = (SingleInputNode) sinkAfterReduce.getPredecessorNode();
+ SingleInputNode mapNode = (SingleInputNode) reduceNode.getPredecessorNode();
+
+ SingleInputNode flatMapNode = (SingleInputNode) sinkAfterFlatMap.getPredecessorNode();
+ TwoInputNode joinNode = (TwoInputNode) flatMapNode.getPredecessorNode();
+ SingleInputNode filterNode = (SingleInputNode) joinNode.getSecondPredecessorNode();
+
+ TwoInputNode coGroupNode = (TwoInputNode) sinkAfterCoGroup.getPredecessorNode();
+ SingleInputNode otherReduceNode = (SingleInputNode) coGroupNode.getSecondPredecessorNode();
+
+ // test sanity checks (that we constructed the DAG correctly)
+
+ assertEquals(reduceNode, joinNode.getFirstPredecessorNode());
+ assertEquals(mapNode, filterNode.getPredecessorNode());
+ assertEquals(joinNode, coGroupNode.getFirstPredecessorNode());
+ assertEquals(filterNode, otherReduceNode.getPredecessorNode());
+
+ // verify the pipeline breaking status
+
+ assertFalse(sinkAfterReduce.getInputConnection().isBreakingPipeline());
+ assertFalse(sinkAfterFlatMap.getInputConnection().isBreakingPipeline());
+ assertFalse(sinkAfterCoGroup.getInputConnection().isBreakingPipeline());
+
+ assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(flatMapNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
+ assertFalse(coGroupNode.getFirstIncomingConnection().isBreakingPipeline());
+ assertFalse(coGroupNode.getSecondIncomingConnection().isBreakingPipeline());
+
+ // these should be pipeline breakers
+ assertTrue(reduceNode.getIncomingConnection().isBreakingPipeline());
+ assertTrue(filterNode.getIncomingConnection().isBreakingPipeline());
+ assertTrue(otherReduceNode.getIncomingConnection().isBreakingPipeline());
+ assertTrue(joinNode.getSecondIncomingConnection().isBreakingPipeline());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static List<DataSinkNode> convertPlan(Plan p) {
+ PactCompiler.GraphCreatingVisitor dagCreator =
+ new PactCompiler.GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode());
+
+ // create the DAG
+ p.accept(dagCreator);
+ List<DataSinkNode> sinks = dagCreator.getSinks();
+
+ // build a single root and run the branch tracking logic
+ OptimizerNode rootNode;
+ if (sinks.size() == 1) {
+ rootNode = sinks.get(0);
+ }
+ else {
+ Iterator<DataSinkNode> iter = sinks.iterator();
+ rootNode = iter.next();
+
+ while (iter.hasNext()) {
+ rootNode = new SinkJoiner(rootNode, iter.next());
+ }
+ }
+ rootNode.accept(new PactCompiler.IdAndEstimatesVisitor(null));
+ rootNode.accept(new PactCompiler.BranchesVisitor());
+
+ return sinks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityFlatMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityFlatMapper.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityFlatMapper.java
new file mode 100644
index 0000000..5fe32b4
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityFlatMapper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.testfunctions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+public class IdentityFlatMapper<T> implements FlatMapFunction<T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(T value, Collector<T> out) {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
index e99cac7..f75b797 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.operators;
import java.util.ArrayList;
@@ -26,9 +25,10 @@ import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.types.Key;
/**
- *
+ * This class represents an ordering on a set of fields. It specifies the fields and order direction
+ * (ascending, descending).
*/
-public class Ordering {
+public class Ordering implements Cloneable {
protected FieldList indexes = new FieldList();
@@ -212,9 +212,6 @@ public class Ordering {
}
// --------------------------------------------------------------------------------------------
-
-
-
public Ordering clone() {
Ordering newOrdering = new Ordering();
@@ -223,7 +220,6 @@ public class Ordering {
newOrdering.orders.addAll(this.orders);
return newOrdering;
}
-
@Override
public int hashCode() {
@@ -235,7 +231,6 @@ public class Ordering {
return result;
}
-
@Override
public boolean equals(Object obj) {
if (this == obj) {
@@ -273,7 +268,7 @@ public class Ordering {
}
public String toString() {
- final StringBuffer buf = new StringBuffer("[");
+ final StringBuilder buf = new StringBuilder("[");
for (int i = 0; i < indexes.size(); i++) {
if (i != 0) {
buf.append(",");
http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java
new file mode 100644
index 0000000..7810c3e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+/**
+ * Defines how the data exchange between two specific operators happens.
+ */
+public enum DataExchangeMode {
+
+ /**
+ * The data exchange is streamed, sender and receiver are online at the same time,
+ * and the receiver back-pressures the sender.
+ */
+ PIPELINED,
+
+ /**
+ * The data exchange is decoupled. The sender first produces its entire result and finishes.
+ * After that, the receiver is stated and may consume the data.
+ */
+ BATCH,
+
+ /**
+ * The data exchange starts like ine {@link #PIPELINED} and falls back to {@link #BATCH}
+ * for recovery runs.
+ */
+ PIPELINE_WITH_BATCH_FALLBACK;
+
+ // ------------------------------------------------------------------------
+
+ public static DataExchangeMode getForForwardExchange(ExecutionMode mode) {
+ return FORWARD[mode.ordinal()];
+ }
+
+ public static DataExchangeMode getForShuffleOrBroadcast(ExecutionMode mode) {
+ return SHUFFLE[mode.ordinal()];
+ }
+
+ public static DataExchangeMode getPipelineBreakingExchange(ExecutionMode mode) {
+ return BREAKING[mode.ordinal()];
+ }
+
+ /**
+ * Computes the mode of data exchange to be used for a, given an execution mode and ship strategy.
+ * The type of the data exchange depends also on whether this connection has been identified to require
+ * pipeline breaking for deadlock avoidance.
+ * <ul>
+ * <li>If the connection is set to be pipeline breaking, this returns the pipeline breaking variant
+ * of the execution mode
+ * {@link org.apache.flink.runtime.io.network.DataExchangeMode#getPipelineBreakingExchange(org.apache.flink.api.common.ExecutionMode)}.
+ * </li>
+ * <li>If the data exchange is a simple FORWARD (one-to-one communication), this returns
+ * {@link org.apache.flink.runtime.io.network.DataExchangeMode#getForForwardExchange(org.apache.flink.api.common.ExecutionMode)}.
+ * </li>
+ * <li>If otherwise, this returns
+ * {@link org.apache.flink.runtime.io.network.DataExchangeMode#getForShuffleOrBroadcast(org.apache.flink.api.common.ExecutionMode)}.
+ * </li>
+ * </ul>
+ *
+ * @param shipStrategy The ship strategy (FORWARD, PARTITION, BROADCAST, ...) of the runtime data exchange.
+ * @return The data exchange mode for the connection, given the concrete ship strategy.
+ */
+ public static DataExchangeMode select(ExecutionMode executionMode, ShipStrategyType shipStrategy,
+ boolean breakPipeline) {
+
+ if (shipStrategy == null || shipStrategy == ShipStrategyType.NONE) {
+ throw new IllegalArgumentException("shipStrategy may not be null or NONE");
+ }
+ if (executionMode == null) {
+ throw new IllegalArgumentException("executionMode may not mbe null");
+ }
+
+ if (breakPipeline) {
+ return getPipelineBreakingExchange(executionMode);
+ }
+ else if (shipStrategy == ShipStrategyType.FORWARD) {
+ return getForForwardExchange(executionMode);
+ }
+ else {
+ return getForShuffleOrBroadcast(executionMode);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final DataExchangeMode[] FORWARD = new DataExchangeMode[ExecutionMode.values().length];
+
+ private static final DataExchangeMode[] SHUFFLE = new DataExchangeMode[ExecutionMode.values().length];
+
+ private static final DataExchangeMode[] BREAKING = new DataExchangeMode[ExecutionMode.values().length];
+
+ // initialize the map between execution modes and exchange modes in
+ static {
+ FORWARD[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
+ SHUFFLE[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
+ BREAKING[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
+
+ FORWARD[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
+ SHUFFLE[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
+ BREAKING[ExecutionMode.PIPELINED.ordinal()] = BATCH;
+
+ FORWARD[ExecutionMode.BATCH.ordinal()] = PIPELINED;
+ SHUFFLE[ExecutionMode.BATCH.ordinal()] = BATCH;
+ BREAKING[ExecutionMode.BATCH.ordinal()] = BATCH;
+
+ FORWARD[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
+ SHUFFLE[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
+ BREAKING[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java
new file mode 100644
index 0000000..cae80e8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * This test verifies that the data exchange modes are defined for every execution mode.
+ */
+public class DataExchangeModeTest {
+
+ @Test
+ public void testForward() {
+ for (ExecutionMode mode : ExecutionMode.values()) {
+ assertNotNull(DataExchangeMode.getForForwardExchange(mode));
+ }
+ }
+
+ @Test
+ public void testShuffleAndBroadcast() {
+ for (ExecutionMode mode : ExecutionMode.values()) {
+ assertNotNull(DataExchangeMode.getForShuffleOrBroadcast(mode));
+ }
+ }
+
+ @Test
+ public void testPipelineBreaking() {
+ for (ExecutionMode mode : ExecutionMode.values()) {
+ assertNotNull(DataExchangeMode.getPipelineBreakingExchange(mode));
+ }
+ }
+}