You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:42 UTC
[03/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
new file mode 100644
index 0000000..5175d8c
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test verifies that the optimizer assigns the correct
+ * data exchange mode to a simple forward / shuffle plan.
+ *
+ * <pre>
+ * (source) -> (map) -> (filter) -> (groupBy / reduce)
+ * </pre>
+ */
+@SuppressWarnings("serial")
+public class DataExchangeModeForwardTest extends CompilerTestBase {
+
+
+ @Test
+ public void testPipelinedForced() {
+ // PIPELINED_FORCED should result in pipelining all the way
+ verifySimpleForwardPlan(ExecutionMode.PIPELINED_FORCED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testPipelined() {
+ // PIPELINED should result in pipelining all the way
+ verifySimpleForwardPlan(ExecutionMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testBatch() {
+ // BATCH should result in batching the shuffle all the way
+ verifySimpleForwardPlan(ExecutionMode.BATCH,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.BATCH, DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testBatchForced() {
+ // BATCH_FORCED should result in batching all the way
+ verifySimpleForwardPlan(ExecutionMode.BATCH_FORCED,
+ DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+ DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
+ DataExchangeMode.BATCH, DataExchangeMode.BATCH);
+ }
+
+ private void verifySimpleForwardPlan(ExecutionMode execMode,
+ DataExchangeMode toMap,
+ DataExchangeMode toFilter,
+ DataExchangeMode toKeyExtractor,
+ DataExchangeMode toCombiner,
+ DataExchangeMode toReduce,
+ DataExchangeMode toSink)
+ {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setExecutionMode(execMode);
+
+ DataSet<String> dataSet = env.readTextFile("/never/accessed");
+ dataSet
+ .map(new MapFunction<String, Integer>() {
+ @Override
+ public Integer map(String value) {
+ return 0;
+ }
+ })
+ .filter(new FilterFunction<Integer>() {
+ @Override
+ public boolean filter(Integer value) {
+ return false;
+ }
+ })
+ .groupBy(new IdentityKeyExtractor<Integer>())
+ .reduceGroup(new Top1GroupReducer<Integer>())
+ .output(new DiscardingOutputFormat<Integer>());
+
+ OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
+ SinkPlanNode sinkNode = optPlan.getDataSinks().iterator().next();
+
+ SingleInputPlanNode reduceNode = (SingleInputPlanNode) sinkNode.getPredecessor();
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getPredecessor();
+ SingleInputPlanNode keyExtractorNode = (SingleInputPlanNode) combineNode.getPredecessor();
+
+ SingleInputPlanNode filterNode = (SingleInputPlanNode) keyExtractorNode.getPredecessor();
+ SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
+
+ assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
+ assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
+ assertEquals(toKeyExtractor, keyExtractorNode.getInput().getDataExchangeMode());
+ assertEquals(toCombiner, combineNode.getInput().getDataExchangeMode());
+ assertEquals(toReduce, reduceNode.getInput().getDataExchangeMode());
+ assertEquals(toSink, sinkNode.getInput().getDataExchangeMode());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
new file mode 100644
index 0000000..6b2691a
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test checks the correct assignment of the DataExchangeMode to
+ * connections for programs that branch, but do not re-join the branches.
+ *
+ * <pre>
+ * /---> (filter) -> (sink)
+ * /
+ * /
+ * (source) -> (map) -----------------\
+ * \ (join) -> (sink)
+ * \ (source) --/
+ * \
+ * \
+ * \-> (sink)
+ * </pre>
+ */
+@SuppressWarnings({"serial", "unchecked"})
+public class DataExchangeModeOpenBranchingTest extends CompilerTestBase {
+
+ @Test
+ public void testPipelinedForced() {
+ // PIPELINED_FORCED should result in pipelining all the way
+ verifyBranchigPlan(ExecutionMode.PIPELINED_FORCED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testPipelined() {
+ // PIPELINED should result in pipelining all the way
+ verifyBranchigPlan(ExecutionMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testBatch() {
+ // BATCH should result in batching the shuffle all the way
+ verifyBranchigPlan(ExecutionMode.BATCH,
+ DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED, DataExchangeMode.BATCH,
+ DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
+ DataExchangeMode.PIPELINED);
+ }
+
+ @Test
+ public void testBatchForced() {
+ // BATCH_FORCED should result in batching all the way
+ verifyBranchigPlan(ExecutionMode.BATCH_FORCED,
+ DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+ DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+ DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+ DataExchangeMode.BATCH);
+ }
+
+ private void verifyBranchigPlan(ExecutionMode execMode,
+ DataExchangeMode toMap,
+ DataExchangeMode toFilter,
+ DataExchangeMode toFilterSink,
+ DataExchangeMode toJoin1,
+ DataExchangeMode toJoin2,
+ DataExchangeMode toJoinSink,
+ DataExchangeMode toDirectSink)
+ {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setExecutionMode(execMode);
+
+ DataSet<Tuple2<Long, Long>> data = env.generateSequence(1, 100000)
+ .map(new MapFunction<Long, Tuple2<Long, Long>>() {
+ @Override
+ public Tuple2<Long, Long> map(Long value) {
+ return new Tuple2<Long, Long>(value, value);
+ }
+ });
+
+ // output 1
+ data
+ .filter(new FilterFunction<Tuple2<Long, Long>>() {
+ @Override
+ public boolean filter(Tuple2<Long, Long> value) {
+ return false;
+ }
+ })
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink1");
+
+ // output 2 does a join before a join
+ data
+ .join(env.fromElements(new Tuple2<Long, Long>(1L, 2L)))
+ .where(1)
+ .equalTo(0)
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()).name("sink2");
+
+ // output 3 is direct
+ data
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink3");
+
+ OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
+
+ SinkPlanNode filterSink = findSink(optPlan.getDataSinks(), "sink1");
+ SinkPlanNode joinSink = findSink(optPlan.getDataSinks(), "sink2");
+ SinkPlanNode directSink = findSink(optPlan.getDataSinks(), "sink3");
+
+ SingleInputPlanNode filterNode = (SingleInputPlanNode) filterSink.getPredecessor();
+ SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
+
+ DualInputPlanNode joinNode = (DualInputPlanNode) joinSink.getPredecessor();
+ assertEquals(mapNode, joinNode.getInput1().getSource());
+
+ assertEquals(mapNode, directSink.getPredecessor());
+
+ assertEquals(toFilterSink, filterSink.getInput().getDataExchangeMode());
+ assertEquals(toJoinSink, joinSink.getInput().getDataExchangeMode());
+ assertEquals(toDirectSink, directSink.getInput().getDataExchangeMode());
+
+ assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
+ assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
+
+ assertEquals(toJoin1, joinNode.getInput1().getDataExchangeMode());
+ assertEquals(toJoin2, joinNode.getInput2().getDataExchangeMode());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) {
+ for (SinkPlanNode node : collection) {
+ String nodeName = node.getOptimizerNode().getOperator().getName();
+ if (nodeName != null && nodeName.equals(name)) {
+ return node;
+ }
+ }
+
+ throw new IllegalArgumentException("No node with that name was found.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
new file mode 100644
index 0000000..1a14be5
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataexchange;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dag.SinkJoiner;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper;
+import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.optimizer.traversals.BranchesVisitor;
+import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
+import org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * This test checks whether connections are correctly marked as pipelined breaking.
+ */
+@SuppressWarnings("serial")
+public class PipelineBreakingTest {
+
+ /**
+ * Tests that no pipeline breakers are inserted into a simple forward
+ * pipeline.
+ *
+ * <pre>
+ * (source) -> (map) -> (filter) -> (groupBy / reduce)
+ * </pre>
+ */
+ @Test
+ public void testSimpleForwardPlan() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> dataSet = env.readTextFile("/never/accessed");
+ dataSet
+ .map(new MapFunction<String, Integer>() {
+ @Override
+ public Integer map(String value) {
+ return 0;
+ }
+ })
+ .filter(new FilterFunction<Integer>() {
+ @Override
+ public boolean filter(Integer value) {
+ return false;
+ }
+ })
+ .groupBy(new IdentityKeyExtractor<Integer>())
+ .reduceGroup(new Top1GroupReducer<Integer>())
+ .output(new DiscardingOutputFormat<Integer>());
+
+ DataSinkNode sinkNode = convertPlan(env.createProgramPlan()).get(0);
+
+ SingleInputNode reduceNode = (SingleInputNode) sinkNode.getPredecessorNode();
+ SingleInputNode keyExtractorNode = (SingleInputNode) reduceNode.getPredecessorNode();
+
+ SingleInputNode filterNode = (SingleInputNode) keyExtractorNode.getPredecessorNode();
+ SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
+
+ assertFalse(sinkNode.getInputConnection().isBreakingPipeline());
+ assertFalse(reduceNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(keyExtractorNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Tests that branching plans, where the branches are not re-joined,
+ * do not place pipeline breakers.
+ *
+ * <pre>
+ * /---> (filter) -> (sink)
+ * /
+ * /
+ * (source) -> (map) -----------------\
+ * \ (join) -> (sink)
+ * \ (source) --/
+ * \
+ * \
+ * \-> (sink)
+ * </pre>
+ */
+ @Test
+ public void testBranchingPlanNotReJoined() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> data = env.readTextFile("/never/accessed")
+ .map(new MapFunction<String, Integer>() {
+ @Override
+ public Integer map(String value) {
+ return 0;
+ }
+ });
+
+ // output 1
+ data
+ .filter(new FilterFunction<Integer>() {
+ @Override
+ public boolean filter(Integer value) {
+ return false;
+ }
+ })
+ .output(new DiscardingOutputFormat<Integer>());
+
+ // output 2 does a join before a join
+ data
+ .join(env.fromElements(1, 2, 3, 4))
+ .where(new IdentityKeyExtractor<Integer>())
+ .equalTo(new IdentityKeyExtractor<Integer>())
+ .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
+
+ // output 3 is direct
+ data
+ .output(new DiscardingOutputFormat<Integer>());
+
+ List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
+
+ // gather the optimizer DAG nodes
+
+ DataSinkNode sinkAfterFilter = sinks.get(0);
+ DataSinkNode sinkAfterJoin = sinks.get(1);
+ DataSinkNode sinkDirect = sinks.get(2);
+
+ SingleInputNode filterNode = (SingleInputNode) sinkAfterFilter.getPredecessorNode();
+ SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
+
+ TwoInputNode joinNode = (TwoInputNode) sinkAfterJoin.getPredecessorNode();
+ SingleInputNode joinInput = (SingleInputNode) joinNode.getSecondPredecessorNode();
+
+ // verify the non-pipeline breaking status
+
+ assertFalse(sinkAfterFilter.getInputConnection().isBreakingPipeline());
+ assertFalse(sinkAfterJoin.getInputConnection().isBreakingPipeline());
+ assertFalse(sinkDirect.getInputConnection().isBreakingPipeline());
+
+ assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+
+ assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
+ assertFalse(joinNode.getSecondIncomingConnection().isBreakingPipeline());
+ assertFalse(joinInput.getIncomingConnection().isBreakingPipeline());
+
+ // some other sanity checks on the plan construction (cannot hurt)
+
+ assertEquals(mapNode, ((SingleInputNode) joinNode.getFirstPredecessorNode()).getPredecessorNode());
+ assertEquals(mapNode, sinkDirect.getPredecessorNode());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Tests that branches that are re-joined have place pipeline breakers.
+ *
+ * <pre>
+ * /-> (sink)
+ * /
+ * /-> (reduce) -+ /-> (flatmap) -> (sink)
+ * / \ /
+ * (source) -> (map) - (join) -+-----\
+ * \ / \
+ * \-> (filter) -+ \
+ * \ (co group) -> (sink)
+ * \ /
+ * \-> (reduce) - /
+ * </pre>
+ */
+ @Test
+ public void testReJoinedBranches() {
+ try {
+ // build a test program
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> data = env.fromElements(33L, 44L)
+ .map(new MapFunction<Long, Tuple2<Long, Long>>() {
+ @Override
+ public Tuple2<Long, Long> map(Long value) {
+ return new Tuple2<Long, Long>(value, value);
+ }
+ });
+
+ DataSet<Tuple2<Long, Long>> reduced = data.groupBy(0).reduce(new SelectOneReducer<Tuple2<Long, Long>>());
+ reduced.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> filtered = data.filter(new FilterFunction<Tuple2<Long, Long>>() {
+ @Override
+ public boolean filter(Tuple2<Long, Long> value) throws Exception {
+ return false;
+ }
+ });
+
+ DataSet<Tuple2<Long, Long>> joined = reduced.join(filtered)
+ .where(1).equalTo(1)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ joined.flatMap(new IdentityFlatMapper<Tuple2<Long, Long>>())
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+ joined.coGroup(filtered.groupBy(1).reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>()))
+ .where(0).equalTo(0)
+ .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+ List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
+
+ // gather the optimizer DAG nodes
+
+ DataSinkNode sinkAfterReduce = sinks.get(0);
+ DataSinkNode sinkAfterFlatMap = sinks.get(1);
+ DataSinkNode sinkAfterCoGroup = sinks.get(2);
+
+ SingleInputNode reduceNode = (SingleInputNode) sinkAfterReduce.getPredecessorNode();
+ SingleInputNode mapNode = (SingleInputNode) reduceNode.getPredecessorNode();
+
+ SingleInputNode flatMapNode = (SingleInputNode) sinkAfterFlatMap.getPredecessorNode();
+ TwoInputNode joinNode = (TwoInputNode) flatMapNode.getPredecessorNode();
+ SingleInputNode filterNode = (SingleInputNode) joinNode.getSecondPredecessorNode();
+
+ TwoInputNode coGroupNode = (TwoInputNode) sinkAfterCoGroup.getPredecessorNode();
+ SingleInputNode otherReduceNode = (SingleInputNode) coGroupNode.getSecondPredecessorNode();
+
+ // test sanity checks (that we constructed the DAG correctly)
+
+ assertEquals(reduceNode, joinNode.getFirstPredecessorNode());
+ assertEquals(mapNode, filterNode.getPredecessorNode());
+ assertEquals(joinNode, coGroupNode.getFirstPredecessorNode());
+ assertEquals(filterNode, otherReduceNode.getPredecessorNode());
+
+ // verify the pipeline breaking status
+
+ assertFalse(sinkAfterReduce.getInputConnection().isBreakingPipeline());
+ assertFalse(sinkAfterFlatMap.getInputConnection().isBreakingPipeline());
+ assertFalse(sinkAfterCoGroup.getInputConnection().isBreakingPipeline());
+
+ assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(flatMapNode.getIncomingConnection().isBreakingPipeline());
+ assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
+ assertFalse(coGroupNode.getFirstIncomingConnection().isBreakingPipeline());
+ assertFalse(coGroupNode.getSecondIncomingConnection().isBreakingPipeline());
+
+ // these should be pipeline breakers
+ assertTrue(reduceNode.getIncomingConnection().isBreakingPipeline());
+ assertTrue(filterNode.getIncomingConnection().isBreakingPipeline());
+ assertTrue(otherReduceNode.getIncomingConnection().isBreakingPipeline());
+ assertTrue(joinNode.getSecondIncomingConnection().isBreakingPipeline());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static List<DataSinkNode> convertPlan(Plan p) {
+ GraphCreatingVisitor dagCreator =
+ new GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode());
+
+ // create the DAG
+ p.accept(dagCreator);
+ List<DataSinkNode> sinks = dagCreator.getSinks();
+
+ // build a single root and run the branch tracking logic
+ OptimizerNode rootNode;
+ if (sinks.size() == 1) {
+ rootNode = sinks.get(0);
+ }
+ else {
+ Iterator<DataSinkNode> iter = sinks.iterator();
+ rootNode = iter.next();
+
+ while (iter.hasNext()) {
+ rootNode = new SinkJoiner(rootNode, iter.next());
+ }
+ }
+ rootNode.accept(new IdAndEstimatesVisitor(null));
+ rootNode.accept(new BranchesVisitor());
+
+ return sinks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
new file mode 100644
index 0000000..3e32905
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class GlobalPropertiesFilteringTest {
+
+ private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
+ new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test
+ public void testAllErased1() {
+
+ SingleInputSemanticProperties semProps = new SingleInputSemanticProperties();
+
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setHashPartitioned(new FieldList(0, 1));
+ gprops.addUniqueFieldCombination(new FieldSet(3, 4));
+ gprops.addUniqueFieldCombination(new FieldSet(5, 6));
+
+ GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0);
+
+ assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+ assertNull(result.getPartitioningFields());
+ assertNull(result.getPartitioningOrdering());
+ assertNull(result.getUniqueFieldCombination());
+ }
+
+ @Test
+ public void testAllErased2() {
+
+ SingleInputSemanticProperties semProps = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(semProps, new String[]{"2"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setHashPartitioned(new FieldList(0, 1));
+ gprops.addUniqueFieldCombination(new FieldSet(3, 4));
+ gprops.addUniqueFieldCombination(new FieldSet(5, 6));
+
+ GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0);
+
+ assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+ assertNull(result.getPartitioningFields());
+ assertNull(result.getPartitioningOrdering());
+ assertNull(result.getUniqueFieldCombination());
+ }
+
+ @Test
+ public void testHashPartitioningPreserved1() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning());
+ FieldList pFields = result.getPartitioningFields();
+ assertEquals(3, pFields.size());
+ assertTrue(pFields.contains(0));
+ assertTrue(pFields.contains(1));
+ assertTrue(pFields.contains(4));
+ }
+
+ @Test
+ public void testHashPartitioningPreserved2() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning());
+ FieldList pFields = result.getPartitioningFields();
+ assertEquals(3, pFields.size());
+ assertTrue(pFields.contains(1));
+ assertTrue(pFields.contains(2));
+ assertTrue(pFields.contains(3));
+ }
+
+ @Test
+ public void testHashPartitioningErased() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+ assertNull(result.getPartitioningFields());
+ }
+
+ @Test
+ public void testAnyPartitioningPreserved1() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setAnyPartitioning(new FieldList(0, 1, 4));
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning());
+ FieldList pFields = result.getPartitioningFields();
+ assertEquals(3, pFields.size());
+ assertTrue(pFields.contains(0));
+ assertTrue(pFields.contains(1));
+ assertTrue(pFields.contains(4));
+ }
+
+ @Test
+ public void testAnyPartitioningPreserved2() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setAnyPartitioning(new FieldList(0, 1, 4));
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning());
+ FieldList pFields = result.getPartitioningFields();
+ assertEquals(3, pFields.size());
+ assertTrue(pFields.contains(1));
+ assertTrue(pFields.contains(2));
+ assertTrue(pFields.contains(3));
+ }
+
+ @Test
+ public void testAnyPartitioningErased() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setAnyPartitioning(new FieldList(0, 1, 4));
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+ assertNull(result.getPartitioningFields());
+ }
+
+ @Test
+ public void testCustomPartitioningPreserved1() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
+ gprops.setCustomPartitioned(new FieldList(0, 4), myP);
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning());
+ FieldList pFields = result.getPartitioningFields();
+ assertEquals(2, pFields.size());
+ assertTrue(pFields.contains(0));
+ assertTrue(pFields.contains(4));
+ assertEquals(myP, result.getCustomPartitioner());
+ }
+
+ @Test
+ public void testCustomPartitioningPreserved2() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
+ gprops.setCustomPartitioned(new FieldList(0, 4), myP);
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning());
+ FieldList pFields = result.getPartitioningFields();
+ assertEquals(2, pFields.size());
+ assertTrue(pFields.contains(1));
+ assertTrue(pFields.contains(3));
+ assertEquals(myP, result.getCustomPartitioner());
+ }
+
+ @Test
+ public void testCustomPartitioningErased() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
+ gprops.setCustomPartitioned(new FieldList(0, 4), myP);
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+ assertNull(result.getPartitioningFields());
+ assertNull(result.getCustomPartitioner());
+ }
+
+ @Test
+ public void testRangePartitioningPreserved1() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;2;5"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+ o.appendOrdering(2, StringValue.class, Order.ASCENDING);
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setRangePartitioned(o);
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning());
+ FieldList pFields = result.getPartitioningFields();
+ assertEquals(3, pFields.size());
+ assertEquals(1, pFields.get(0).intValue());
+ assertEquals(5, pFields.get(1).intValue());
+ assertEquals(2, pFields.get(2).intValue());
+ Ordering pOrder = result.getPartitioningOrdering();
+ assertEquals(3, pOrder.getNumberOfFields());
+ assertEquals(1, pOrder.getFieldNumber(0).intValue());
+ assertEquals(5, pOrder.getFieldNumber(1).intValue());
+ assertEquals(2, pOrder.getFieldNumber(2).intValue());
+ assertEquals(Order.ASCENDING, pOrder.getOrder(0));
+ assertEquals(Order.DESCENDING, pOrder.getOrder(1));
+ assertEquals(Order.ASCENDING, pOrder.getOrder(2));
+ assertEquals(IntValue.class, pOrder.getType(0));
+ assertEquals(LongValue.class, pOrder.getType(1));
+ assertEquals(StringValue.class, pOrder.getType(2));
+ }
+
+ @Test
+ public void testRangePartitioningPreserved2() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1->3; 2->0; 5->1"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+ o.appendOrdering(2, StringValue.class, Order.ASCENDING);
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setRangePartitioned(o);
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning());
+ FieldList pFields = result.getPartitioningFields();
+ assertEquals(3, pFields.size());
+ assertEquals(3, pFields.get(0).intValue());
+ assertEquals(1, pFields.get(1).intValue());
+ assertEquals(0, pFields.get(2).intValue());
+ Ordering pOrder = result.getPartitioningOrdering();
+ assertEquals(3, pOrder.getNumberOfFields());
+ assertEquals(3, pOrder.getFieldNumber(0).intValue());
+ assertEquals(1, pOrder.getFieldNumber(1).intValue());
+ assertEquals(0, pOrder.getFieldNumber(2).intValue());
+ assertEquals(Order.ASCENDING, pOrder.getOrder(0));
+ assertEquals(Order.DESCENDING, pOrder.getOrder(1));
+ assertEquals(Order.ASCENDING, pOrder.getOrder(2));
+ assertEquals(IntValue.class, pOrder.getType(0));
+ assertEquals(LongValue.class, pOrder.getType(1));
+ assertEquals(StringValue.class, pOrder.getType(2));
+ }
+
+ @Test
+ public void testRangePartitioningErased() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;5"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+ o.appendOrdering(2, StringValue.class, Order.ASCENDING);
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setRangePartitioned(o);
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+ assertNull(result.getPartitioningOrdering());
+ assertNull(result.getPartitioningFields());
+ }
+
+ @Test
+ public void testRebalancingPreserved() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setForcedRebalanced();
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+ assertEquals(PartitioningProperty.FORCED_REBALANCED, result.getPartitioning());
+ assertNull(result.getPartitioningFields());
+ }
+
+ @Test
+ public void testUniqueFieldGroupsPreserved1() {
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+ FieldSet set1 = new FieldSet(0, 1, 2);
+ FieldSet set2 = new FieldSet(3, 4);
+ FieldSet set3 = new FieldSet(4, 5, 6, 7);
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.addUniqueFieldCombination(set1);
+ gprops.addUniqueFieldCombination(set2);
+ gprops.addUniqueFieldCombination(set3);
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+ Set<FieldSet> unique = result.getUniqueFieldCombination();
+ FieldSet expected1 = new FieldSet(0, 1, 2);
+ FieldSet expected2 = new FieldSet(3, 4);
+
+ Assert.assertTrue(unique.size() == 2);
+ Assert.assertTrue(unique.contains(expected1));
+ Assert.assertTrue(unique.contains(expected2));
+ }
+
+ @Test
+ public void testUniqueFieldGroupsPreserved2() {
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->5;1;2;3->6;4"}, null, null, tupleInfo, tupleInfo);
+
+ FieldSet set1 = new FieldSet(0, 1, 2);
+ FieldSet set2 = new FieldSet(3, 4);
+ FieldSet set3 = new FieldSet(4, 5, 6, 7);
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.addUniqueFieldCombination(set1);
+ gprops.addUniqueFieldCombination(set2);
+ gprops.addUniqueFieldCombination(set3);
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+ Set<FieldSet> unique = result.getUniqueFieldCombination();
+ FieldSet expected1 = new FieldSet(1, 2, 5);
+ FieldSet expected2 = new FieldSet(4, 6);
+
+ Assert.assertTrue(unique.size() == 2);
+ Assert.assertTrue(unique.contains(expected1));
+ Assert.assertTrue(unique.contains(expected2));
+ }
+
+ @Test
+ public void testUniqueFieldGroupsErased() {
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0; 3; 5; 6; 7"}, null, null, tupleInfo, tupleInfo);
+
+ FieldSet set1 = new FieldSet(0, 1, 2);
+ FieldSet set2 = new FieldSet(3, 4);
+ FieldSet set3 = new FieldSet(4, 5, 6, 7);
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.addUniqueFieldCombination(set1);
+ gprops.addUniqueFieldCombination(set2);
+ gprops.addUniqueFieldCombination(set3);
+
+ GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+ Assert.assertNull(result.getUniqueFieldCombination());
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testInvalidInputIndex() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+ GlobalProperties gprops = new GlobalProperties();
+ gprops.setHashPartitioned(new FieldList(0, 1));
+
+ gprops.filterBySemanticProperties(sprops, 1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
new file mode 100644
index 0000000..52826d6
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+public class GlobalPropertiesMatchingTest {
+
+ @Test
+ public void testMatchingAnyPartitioning() {
+ try {
+
+ RequestedGlobalProperties req = new RequestedGlobalProperties();
+ req.setAnyPartitioning(new FieldSet(6, 2));
+
+ // match any partitioning
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setAnyPartitioning(new FieldList(2, 6));
+ assertTrue(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setAnyPartitioning(new FieldList(6, 2));
+ assertTrue(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setAnyPartitioning(new FieldList(6, 2, 4));
+ assertFalse(req.isMetBy(gp3));
+
+ GlobalProperties gp4 = new GlobalProperties();
+ gp4.setAnyPartitioning(new FieldList(6, 1));
+ assertFalse(req.isMetBy(gp4));
+
+ GlobalProperties gp5 = new GlobalProperties();
+ gp5.setAnyPartitioning(new FieldList(2));
+ assertTrue(req.isMetBy(gp5));
+ }
+
+ // match hash partitioning
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setHashPartitioned(new FieldList(2, 6));
+ assertTrue(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setHashPartitioned(new FieldList(6, 2));
+ assertTrue(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setHashPartitioned(new FieldList(6, 1));
+ assertFalse(req.isMetBy(gp3));
+ }
+
+ // match range partitioning
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+ assertTrue(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+ assertTrue(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+ assertFalse(req.isMetBy(gp3));
+
+ GlobalProperties gp4 = new GlobalProperties();
+ gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
+ assertTrue(req.isMetBy(gp4));
+ }
+
+ // match custom partitioning
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setCustomPartitioned(new FieldList(2, 6), new MockPartitioner());
+ assertTrue(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner());
+ assertTrue(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setCustomPartitioned(new FieldList(6, 1), new MockPartitioner());
+ assertFalse(req.isMetBy(gp3));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMatchingCustomPartitioning() {
+ try {
+ final Partitioner<Tuple2<Long, Integer>> partitioner = new MockPartitioner();
+
+ RequestedGlobalProperties req = new RequestedGlobalProperties();
+ req.setCustomPartitioned(new FieldSet(6, 2), partitioner);
+
+ // match custom partitionings
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setCustomPartitioned(new FieldList(2, 6), partitioner);
+ assertTrue(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setCustomPartitioned(new FieldList(6, 2), partitioner);
+ assertTrue(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner());
+ assertFalse(req.isMetBy(gp3));
+ }
+
+ // cannot match other types of partitionings
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setAnyPartitioning(new FieldList(6, 2));
+ assertFalse(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setHashPartitioned(new FieldList(6, 2));
+ assertFalse(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+ assertFalse(req.isMetBy(gp3));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testStrictlyMatchingAnyPartitioning() {
+
+ RequestedGlobalProperties req = new RequestedGlobalProperties();
+ req.setAnyPartitioning(new FieldList(6, 2));
+
+ // match any partitioning
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setAnyPartitioning(new FieldList(6, 2));
+ assertTrue(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setAnyPartitioning(new FieldList(2, 6));
+ assertFalse(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setAnyPartitioning(new FieldList(6, 2, 3));
+ assertFalse(req.isMetBy(gp3));
+
+ GlobalProperties gp4 = new GlobalProperties();
+ gp3.setAnyPartitioning(new FieldList(6, 1));
+ assertFalse(req.isMetBy(gp3));
+
+ GlobalProperties gp5 = new GlobalProperties();
+ gp4.setAnyPartitioning(new FieldList(2));
+ assertFalse(req.isMetBy(gp4));
+ }
+
+ // match hash partitioning
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setHashPartitioned(new FieldList(6, 2));
+ assertTrue(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setHashPartitioned(new FieldList(2, 6));
+ assertFalse(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setHashPartitioned(new FieldList(6, 1));
+ assertFalse(req.isMetBy(gp3));
+ }
+
+ // match range partitioning
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+ assertTrue(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+ assertFalse(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+ assertFalse(req.isMetBy(gp3));
+
+ GlobalProperties gp4 = new GlobalProperties();
+ gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
+ assertFalse(req.isMetBy(gp4));
+ }
+
+ }
+
+ @Test
+ public void testStrictlyMatchingHashPartitioning() {
+
+ RequestedGlobalProperties req = new RequestedGlobalProperties();
+ req.setHashPartitioned(new FieldList(6, 2));
+
+ // match any partitioning
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setAnyPartitioning(new FieldList(6, 2));
+ assertFalse(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setAnyPartitioning(new FieldList(2, 6));
+ assertFalse(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setAnyPartitioning(new FieldList(6, 1));
+ assertFalse(req.isMetBy(gp3));
+
+ GlobalProperties gp4 = new GlobalProperties();
+ gp4.setAnyPartitioning(new FieldList(2));
+ assertFalse(req.isMetBy(gp4));
+ }
+
+ // match hash partitioning
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setHashPartitioned(new FieldList(6, 2));
+ assertTrue(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setHashPartitioned(new FieldList(2, 6));
+ assertFalse(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setHashPartitioned(new FieldList(6, 1));
+ assertFalse(req.isMetBy(gp3));
+
+ GlobalProperties gp4 = new GlobalProperties();
+ gp4.setHashPartitioned(new FieldList(6, 2, 0));
+ assertFalse(req.isMetBy(gp4));
+ }
+
+ // match range partitioning
+ {
+ GlobalProperties gp1 = new GlobalProperties();
+ gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+ assertFalse(req.isMetBy(gp1));
+
+ GlobalProperties gp2 = new GlobalProperties();
+ gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+ assertFalse(req.isMetBy(gp2));
+
+ GlobalProperties gp3 = new GlobalProperties();
+ gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+ assertFalse(req.isMetBy(gp3));
+
+ GlobalProperties gp4 = new GlobalProperties();
+ gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
+ assertFalse(req.isMetBy(gp4));
+ }
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
new file mode 100644
index 0000000..0868720
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.junit.Test;
+
+public class GlobalPropertiesPushdownTest {
+
+ @Test
+ public void testAnyPartitioningPushedDown() {
+ try {
+ RequestedGlobalProperties req = new RequestedGlobalProperties();
+ req.setAnyPartitioning(new FieldSet(3, 1));
+
+ RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+ assertEquals(PartitioningProperty.ANY_PARTITIONING, preserved.getPartitioning());
+ assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
+
+ RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0);
+ assertTrue(nonPreserved == null || nonPreserved.isTrivial());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testHashPartitioningPushedDown() {
+ try {
+ RequestedGlobalProperties req = new RequestedGlobalProperties();
+ req.setHashPartitioned(new FieldSet(3, 1));
+
+ RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+ assertEquals(PartitioningProperty.HASH_PARTITIONED, preserved.getPartitioning());
+ assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
+
+ RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0);
+ assertTrue(nonPreserved == null || nonPreserved.isTrivial());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCustomPartitioningNotPushedDown() {
+ try {
+ RequestedGlobalProperties req = new RequestedGlobalProperties();
+ req.setCustomPartitioned(new FieldSet(3, 1), new MockPartitioner());
+
+ RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+ assertTrue(pushedDown == null || pushedDown.isTrivial());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testForcedReblancingNotPushedDown() {
+ try {
+ RequestedGlobalProperties req = new RequestedGlobalProperties();
+ req.setForceRebalancing();
+
+ RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+ assertTrue(pushedDown == null || pushedDown.isTrivial());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static SemanticProperties getAllPreservingSemProps() {
+ return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
+ }
+
+ private static SemanticProperties getNonePreservingSemProps() {
+ return new SingleInputSemanticProperties();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
new file mode 100644
index 0000000..1ff62ed
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+public class LocalPropertiesFilteringTest {
+
+ private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
+ new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test
+ public void testAllErased1() {
+
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, null, tupleInfo, tupleInfo);
+
+ LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2));
+ lProps = lProps.addUniqueFields(new FieldSet(3,4));
+ lProps = lProps.addUniqueFields(new FieldSet(5,6));
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+ assertNull(filtered.getGroupedFields());
+ assertNull(filtered.getOrdering());
+ assertNull(filtered.getUniqueFields());
+ }
+
+ @Test
+ public void testAllErased2() {
+
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"5"}, null, null, tupleInfo, tupleInfo);
+
+ LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2));
+ lProps = lProps.addUniqueFields(new FieldSet(3,4));
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+ assertNull(filtered.getGroupedFields());
+ assertNull(filtered.getOrdering());
+ assertNull(filtered.getUniqueFields());
+ }
+
+ @Test
+ public void testGroupingPreserved1() {
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
+
+ LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+ assertNotNull(filtered.getGroupedFields());
+ assertEquals(3, filtered.getGroupedFields().size());
+ assertTrue(filtered.getGroupedFields().contains(0));
+ assertTrue(filtered.getGroupedFields().contains(2));
+ assertTrue(filtered.getGroupedFields().contains(3));
+ assertNull(filtered.getOrdering());
+ assertNull(filtered.getUniqueFields());
+ }
+
+ @Test
+ public void testGroupingPreserved2() {
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0;3->7"}, null, null, tupleInfo, tupleInfo);
+
+ LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+ assertNotNull(filtered.getGroupedFields());
+ assertEquals(3, filtered.getGroupedFields().size());
+ assertTrue(filtered.getGroupedFields().contains(4));
+ assertTrue(filtered.getGroupedFields().contains(0));
+ assertTrue(filtered.getGroupedFields().contains(7));
+ assertNull(filtered.getOrdering());
+ assertNull(filtered.getUniqueFields());
+ }
+
+ @Test
+ public void testGroupingErased() {
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0"}, null, null, tupleInfo, tupleInfo);
+
+ LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+ assertNull(filtered.getGroupedFields());
+ assertNull(filtered.getOrdering());
+ assertNull(filtered.getUniqueFields());
+ }
+
+ @Test
+ public void testSortingPreserved1() {
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;5"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+ o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+ LocalProperties lProps = LocalProperties.forOrdering(o);
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+ FieldList gFields = filtered.getGroupedFields();
+ Ordering order = filtered.getOrdering();
+
+ assertNotNull(gFields);
+ assertEquals(3, gFields.size());
+ assertTrue(gFields.contains(0));
+ assertTrue(gFields.contains(2));
+ assertTrue(gFields.contains(5));
+ assertNotNull(order);
+ assertEquals(3, order.getNumberOfFields());
+ assertEquals(2, order.getFieldNumber(0).intValue());
+ assertEquals(0, order.getFieldNumber(1).intValue());
+ assertEquals(5, order.getFieldNumber(2).intValue());
+ assertEquals(Order.ASCENDING, order.getOrder(0));
+ assertEquals(Order.DESCENDING, order.getOrder(1));
+ assertEquals(Order.DESCENDING, order.getOrder(2));
+ assertEquals(IntValue.class, order.getType(0));
+ assertEquals(StringValue.class, order.getType(1));
+ assertEquals(LongValue.class, order.getType(2));
+ assertNull(filtered.getUniqueFields());
+ }
+
+ @Test
+ public void testSortingPreserved2() {
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->3;2->7;5->1"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+ o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+ LocalProperties lProps = LocalProperties.forOrdering(o);
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+ FieldList gFields = filtered.getGroupedFields();
+ Ordering order = filtered.getOrdering();
+
+ assertNotNull(gFields);
+ assertEquals(3, gFields.size());
+ assertTrue(gFields.contains(3));
+ assertTrue(gFields.contains(7));
+ assertTrue(gFields.contains(1));
+ assertNotNull(order);
+ assertEquals(3, order.getNumberOfFields());
+ assertEquals(7, order.getFieldNumber(0).intValue());
+ assertEquals(3, order.getFieldNumber(1).intValue());
+ assertEquals(1, order.getFieldNumber(2).intValue());
+ assertEquals(Order.ASCENDING, order.getOrder(0));
+ assertEquals(Order.DESCENDING, order.getOrder(1));
+ assertEquals(Order.DESCENDING, order.getOrder(2));
+ assertEquals(IntValue.class, order.getType(0));
+ assertEquals(StringValue.class, order.getType(1));
+ assertEquals(LongValue.class, order.getType(2));
+ assertNull(filtered.getUniqueFields());
+ }
+
+ @Test
+ public void testSortingPreserved3() {
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+ o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+ LocalProperties lProps = LocalProperties.forOrdering(o);
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+ FieldList gFields = filtered.getGroupedFields();
+ Ordering order = filtered.getOrdering();
+
+ assertNotNull(gFields);
+ assertEquals(2, gFields.size());
+ assertTrue(gFields.contains(0));
+ assertTrue(gFields.contains(2));
+ assertNotNull(order);
+ assertEquals(2, order.getNumberOfFields());
+ assertEquals(2, order.getFieldNumber(0).intValue());
+ assertEquals(0, order.getFieldNumber(1).intValue());
+ assertEquals(Order.ASCENDING, order.getOrder(0));
+ assertEquals(Order.DESCENDING, order.getOrder(1));
+ assertEquals(IntValue.class, order.getType(0));
+ assertEquals(StringValue.class, order.getType(1));
+ assertNull(filtered.getUniqueFields());
+ }
+
+ @Test
+ public void testSortingPreserved4() {
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"2->7;5"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+ o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+ LocalProperties lProps = LocalProperties.forOrdering(o);
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+ FieldList gFields = filtered.getGroupedFields();
+ Ordering order = filtered.getOrdering();
+
+ assertNotNull(gFields);
+ assertEquals(1, gFields.size());
+ assertTrue(gFields.contains(7));
+ assertNotNull(order);
+ assertEquals(1, order.getNumberOfFields());
+ assertEquals(7, order.getFieldNumber(0).intValue());
+ assertEquals(Order.ASCENDING, order.getOrder(0));
+ assertEquals(IntValue.class, order.getType(0));
+ assertNull(filtered.getUniqueFields());
+ }
+
+ @Test
+ public void testSortingErased() {
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;5"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+ o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+ LocalProperties lProps = LocalProperties.forOrdering(o);
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+ FieldList gFields = filtered.getGroupedFields();
+ Ordering order = filtered.getOrdering();
+
+ assertNull(gFields);
+ assertNull(order);
+ assertNull(filtered.getUniqueFields());
+ }
+
+ @Test
+ public void testUniqueFieldsPreserved1() {
+
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+ LocalProperties lProps = new LocalProperties();
+ lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+ lProps = lProps.addUniqueFields(new FieldSet(3,4));
+ lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+ FieldSet expected1 = new FieldSet(0,1,2);
+ FieldSet expected2 = new FieldSet(3,4);
+
+ assertNull(filtered.getGroupedFields());
+ assertNull(filtered.getOrdering());
+ assertNotNull(filtered.getUniqueFields());
+ assertEquals(2, filtered.getUniqueFields().size());
+ assertTrue(filtered.getUniqueFields().contains(expected1));
+ assertTrue(filtered.getUniqueFields().contains(expected2));
+ }
+
+ @Test
+ public void testUniqueFieldsPreserved2() {
+
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+ LocalProperties lProps = LocalProperties.forGrouping(new FieldList(1,2));
+ lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+ lProps = lProps.addUniqueFields(new FieldSet(3,4));
+ lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+ FieldSet expected1 = new FieldSet(0,1,2);
+ FieldSet expected2 = new FieldSet(3,4);
+
+ assertNull(filtered.getOrdering());
+ assertNotNull(filtered.getGroupedFields());
+ assertEquals(2, filtered.getGroupedFields().size());
+ assertTrue(filtered.getGroupedFields().contains(1));
+ assertTrue(filtered.getGroupedFields().contains(2));
+ assertNotNull(filtered.getUniqueFields());
+ assertEquals(2, filtered.getUniqueFields().size());
+ assertTrue(filtered.getUniqueFields().contains(expected1));
+ assertTrue(filtered.getUniqueFields().contains(expected2));
+ }
+
+ @Test
+ public void testUniqueFieldsPreserved3() {
+
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->7;1->6;2->5;3->4;4->3"}, null, null, tupleInfo, tupleInfo);
+
+ LocalProperties lProps = new LocalProperties();
+ lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+ lProps = lProps.addUniqueFields(new FieldSet(3,4));
+ lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+ FieldSet expected1 = new FieldSet(5,6,7);
+ FieldSet expected2 = new FieldSet(3,4);
+
+ assertNull(filtered.getGroupedFields());
+ assertNull(filtered.getOrdering());
+ assertNotNull(filtered.getUniqueFields());
+ assertEquals(2, filtered.getUniqueFields().size());
+ assertTrue(filtered.getUniqueFields().contains(expected1));
+ assertTrue(filtered.getUniqueFields().contains(expected2));
+ }
+
+ @Test
+ public void testUniqueFieldsErased() {
+
+ SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+ LocalProperties lProps = new LocalProperties();
+ lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+ lProps = lProps.addUniqueFields(new FieldSet(3,4));
+ lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+ LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+ assertNull(filtered.getGroupedFields());
+ assertNull(filtered.getOrdering());
+ assertNull(filtered.getUniqueFields());
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testInvalidInputIndex() {
+
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+ LocalProperties lprops = LocalProperties.forGrouping(new FieldList(0,1));
+
+ lprops.filterBySemanticProperties(sprops, 1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
new file mode 100644
index 0000000..74126f8
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Key;
+
+import java.io.IOException;
+
+@SuppressWarnings("serial")
+public class MockDistribution implements DataDistribution {
+
+ @Override
+ public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+ return new Key<?>[0];
+ }
+
+ @Override
+ public int getNumberOfFields() {
+ return 0;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
new file mode 100644
index 0000000..2b2ab14
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+class MockPartitioner implements Partitioner<Tuple2<Long, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int partition(Tuple2<Long, Integer> key, int numPartitions) {
+ return 0;
+ }
+}
\ No newline at end of file