You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:42 UTC

[03/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
new file mode 100644
index 0000000..5175d8c
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test verifies that the optimizer assigns the correct
+ * data exchange mode to a simple forward / shuffle plan.
+ *
+ * <pre>
+ *     (source) -> (map) -> (filter) -> (groupBy / reduce)
+ * </pre>
+ */
+@SuppressWarnings("serial")
+public class DataExchangeModeForwardTest extends CompilerTestBase {
+
+
+	@Test
+	public void testPipelinedForced() {
+		// PIPELINED_FORCED should result in pipelining all the way
+		verifySimpleForwardPlan(ExecutionMode.PIPELINED_FORCED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testPipelined() {
+		// PIPELINED should result in pipelining all the way
+		verifySimpleForwardPlan(ExecutionMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testBatch() {
+		// BATCH should result in batching the shuffle all the way
+		verifySimpleForwardPlan(ExecutionMode.BATCH,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.BATCH, DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testBatchForced() {
+		// BATCH_FORCED should result in batching all the way
+		verifySimpleForwardPlan(ExecutionMode.BATCH_FORCED,
+				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+				DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
+				DataExchangeMode.BATCH, DataExchangeMode.BATCH);
+	}
+
+	private void verifySimpleForwardPlan(ExecutionMode execMode,
+										DataExchangeMode toMap,
+										DataExchangeMode toFilter,
+										DataExchangeMode toKeyExtractor,
+										DataExchangeMode toCombiner,
+										DataExchangeMode toReduce,
+										DataExchangeMode toSink)
+	{
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.getConfig().setExecutionMode(execMode);
+
+			DataSet<String> dataSet = env.readTextFile("/never/accessed");
+			dataSet
+				.map(new MapFunction<String, Integer>() {
+					@Override
+					public Integer map(String value) {
+						return 0;
+					}
+				})
+				.filter(new FilterFunction<Integer>() {
+					@Override
+					public boolean filter(Integer value) {
+						return false;
+					}
+				})
+				.groupBy(new IdentityKeyExtractor<Integer>())
+				.reduceGroup(new Top1GroupReducer<Integer>())
+				.output(new DiscardingOutputFormat<Integer>());
+
+			OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
+			SinkPlanNode sinkNode = optPlan.getDataSinks().iterator().next();
+
+			SingleInputPlanNode reduceNode = (SingleInputPlanNode) sinkNode.getPredecessor();
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getPredecessor();
+			SingleInputPlanNode keyExtractorNode = (SingleInputPlanNode) combineNode.getPredecessor();
+
+			SingleInputPlanNode filterNode = (SingleInputPlanNode) keyExtractorNode.getPredecessor();
+			SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
+
+			assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
+			assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
+			assertEquals(toKeyExtractor, keyExtractorNode.getInput().getDataExchangeMode());
+			assertEquals(toCombiner, combineNode.getInput().getDataExchangeMode());
+			assertEquals(toReduce, reduceNode.getInput().getDataExchangeMode());
+			assertEquals(toSink, sinkNode.getInput().getDataExchangeMode());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
new file mode 100644
index 0000000..6b2691a
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test checks the correct assignment of the DataExchangeMode to
+ * connections for programs that branch, but do not re-join the branches.
+ *
+ * <pre>
+ *                      /---> (filter) -> (sink)
+ *                     /
+ *                    /
+ * (source) -> (map) -----------------\
+ *                    \               (join) -> (sink)
+ *                     \   (source) --/
+ *                      \
+ *                       \
+ *                        \-> (sink)
+ * </pre>
+ */
+@SuppressWarnings({"serial", "unchecked"})
+public class DataExchangeModeOpenBranchingTest extends CompilerTestBase {
+
+	@Test
+	public void testPipelinedForced() {
+		// PIPELINED_FORCED should result in pipelining all the way
+		verifyBranchigPlan(ExecutionMode.PIPELINED_FORCED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testPipelined() {
+		// PIPELINED should result in pipelining all the way
+		verifyBranchigPlan(ExecutionMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testBatch() {
+		// BATCH should result in batching the shuffle all the way
+		verifyBranchigPlan(ExecutionMode.BATCH,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.BATCH,
+				DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testBatchForced() {
+		// BATCH_FORCED should result in batching all the way
+		verifyBranchigPlan(ExecutionMode.BATCH_FORCED,
+				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+				DataExchangeMode.BATCH);
+	}
+
+	private void verifyBranchigPlan(ExecutionMode execMode,
+									DataExchangeMode toMap,
+									DataExchangeMode toFilter,
+									DataExchangeMode toFilterSink,
+									DataExchangeMode toJoin1,
+									DataExchangeMode toJoin2,
+									DataExchangeMode toJoinSink,
+									DataExchangeMode toDirectSink)
+	{
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.getConfig().setExecutionMode(execMode);
+
+			DataSet<Tuple2<Long, Long>> data = env.generateSequence(1, 100000)
+					.map(new MapFunction<Long, Tuple2<Long, Long>>() {
+						@Override
+						public Tuple2<Long, Long> map(Long value) {
+							return new Tuple2<Long, Long>(value, value);
+						}
+					});
+
+			// output 1
+			data
+					.filter(new FilterFunction<Tuple2<Long, Long>>() {
+						@Override
+						public boolean filter(Tuple2<Long, Long> value) {
+							return false;
+						}
+					})
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink1");
+
+			// output 2 does a join before a join
+			data
+					.join(env.fromElements(new Tuple2<Long, Long>(1L, 2L)))
+					.where(1)
+					.equalTo(0)
+					.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()).name("sink2");
+
+			// output 3 is direct
+			data
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink3");
+
+			OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
+
+			SinkPlanNode filterSink = findSink(optPlan.getDataSinks(), "sink1");
+			SinkPlanNode joinSink = findSink(optPlan.getDataSinks(), "sink2");
+			SinkPlanNode directSink = findSink(optPlan.getDataSinks(), "sink3");
+
+			SingleInputPlanNode filterNode = (SingleInputPlanNode) filterSink.getPredecessor();
+			SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
+
+			DualInputPlanNode joinNode = (DualInputPlanNode) joinSink.getPredecessor();
+			assertEquals(mapNode, joinNode.getInput1().getSource());
+
+			assertEquals(mapNode, directSink.getPredecessor());
+
+			assertEquals(toFilterSink, filterSink.getInput().getDataExchangeMode());
+			assertEquals(toJoinSink, joinSink.getInput().getDataExchangeMode());
+			assertEquals(toDirectSink, directSink.getInput().getDataExchangeMode());
+
+			assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
+			assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
+
+			assertEquals(toJoin1, joinNode.getInput1().getDataExchangeMode());
+			assertEquals(toJoin2, joinNode.getInput2().getDataExchangeMode());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) {
+		for (SinkPlanNode node : collection) {
+			String nodeName = node.getOptimizerNode().getOperator().getName();
+			if (nodeName != null && nodeName.equals(name)) {
+				return node;
+			}
+		}
+
+		throw new IllegalArgumentException("No node with that name was found.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
new file mode 100644
index 0000000..1a14be5
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataexchange;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dag.SinkJoiner;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper;
+import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.optimizer.traversals.BranchesVisitor;
+import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
+import org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * This test checks whether connections are correctly marked as pipelined breaking.
+ */
+@SuppressWarnings("serial")
+public class PipelineBreakingTest {
+
+	/**
+	 * Tests that no pipeline breakers are inserted into a simple forward
+	 * pipeline.
+	 *
+	 * <pre>
+	 *     (source) -> (map) -> (filter) -> (groupBy / reduce)
+	 * </pre>
+	 */
+	@Test
+	public void testSimpleForwardPlan() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<String> dataSet = env.readTextFile("/never/accessed");
+			dataSet
+				.map(new MapFunction<String, Integer>() {
+					@Override
+					public Integer map(String value) {
+						return 0;
+					}
+				})
+				.filter(new FilterFunction<Integer>() {
+					@Override
+					public boolean filter(Integer value) {
+						return false;
+					}
+				})
+				.groupBy(new IdentityKeyExtractor<Integer>())
+				.reduceGroup(new Top1GroupReducer<Integer>())
+				.output(new DiscardingOutputFormat<Integer>());
+
+			DataSinkNode sinkNode = convertPlan(env.createProgramPlan()).get(0);
+
+			SingleInputNode reduceNode = (SingleInputNode) sinkNode.getPredecessorNode();
+			SingleInputNode keyExtractorNode = (SingleInputNode) reduceNode.getPredecessorNode();
+
+			SingleInputNode filterNode = (SingleInputNode) keyExtractorNode.getPredecessorNode();
+			SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
+
+			assertFalse(sinkNode.getInputConnection().isBreakingPipeline());
+			assertFalse(reduceNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(keyExtractorNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests that branching plans, where the branches are not re-joined,
+	 * do not place pipeline breakers.
+	 * 
+	 * <pre>
+	 *                      /---> (filter) -> (sink)
+	 *                     /
+	 *                    /
+	 * (source) -> (map) -----------------\
+	 *                    \               (join) -> (sink)
+	 *                     \   (source) --/
+	 *                      \
+	 *                       \
+	 *                        \-> (sink)
+	 * </pre>
+	 */
+	@Test
+	public void testBranchingPlanNotReJoined() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Integer> data = env.readTextFile("/never/accessed")
+				.map(new MapFunction<String, Integer>() {
+					@Override
+					public Integer map(String value) {
+						return 0;
+					}
+				});
+
+			// output 1
+			data
+				.filter(new FilterFunction<Integer>() {
+					@Override
+					public boolean filter(Integer value) {
+						return false;
+					}
+				})
+				.output(new DiscardingOutputFormat<Integer>());
+
+			// output 2 does a join before a join
+			data
+				.join(env.fromElements(1, 2, 3, 4))
+					.where(new IdentityKeyExtractor<Integer>())
+					.equalTo(new IdentityKeyExtractor<Integer>())
+				.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
+
+			// output 3 is direct
+			data
+				.output(new DiscardingOutputFormat<Integer>());
+
+			List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
+
+			// gather the optimizer DAG nodes
+
+			DataSinkNode sinkAfterFilter = sinks.get(0);
+			DataSinkNode sinkAfterJoin = sinks.get(1);
+			DataSinkNode sinkDirect = sinks.get(2);
+
+			SingleInputNode filterNode = (SingleInputNode) sinkAfterFilter.getPredecessorNode();
+			SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
+
+			TwoInputNode joinNode = (TwoInputNode) sinkAfterJoin.getPredecessorNode();
+			SingleInputNode joinInput = (SingleInputNode) joinNode.getSecondPredecessorNode();
+
+			// verify the non-pipeline breaking status
+
+			assertFalse(sinkAfterFilter.getInputConnection().isBreakingPipeline());
+			assertFalse(sinkAfterJoin.getInputConnection().isBreakingPipeline());
+			assertFalse(sinkDirect.getInputConnection().isBreakingPipeline());
+
+			assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+
+			assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
+			assertFalse(joinNode.getSecondIncomingConnection().isBreakingPipeline());
+			assertFalse(joinInput.getIncomingConnection().isBreakingPipeline());
+
+			// some other sanity checks on the plan construction (cannot hurt)
+
+			assertEquals(mapNode, ((SingleInputNode) joinNode.getFirstPredecessorNode()).getPredecessorNode());
+			assertEquals(mapNode, sinkDirect.getPredecessorNode());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests that branches that are re-joined have place pipeline breakers.
+	 * 
+	 * <pre>
+	 *                                         /-> (sink)
+	 *                                        /
+	 *                         /-> (reduce) -+          /-> (flatmap) -> (sink)
+	 *                        /               \        /
+	 *     (source) -> (map) -                (join) -+-----\
+	 *                        \               /              \
+	 *                         \-> (filter) -+                \
+	 *                                       \                (co group) -> (sink)
+	 *                                        \                /
+	 *                                         \-> (reduce) - /
+	 * </pre>
+	 */
+	@Test
+	public void testReJoinedBranches() {
+		try {
+			// build a test program
+
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Tuple2<Long, Long>> data = env.fromElements(33L, 44L)
+					.map(new MapFunction<Long, Tuple2<Long, Long>>() {
+						@Override
+						public Tuple2<Long, Long> map(Long value) {
+							return new Tuple2<Long, Long>(value, value);
+						}
+					});
+
+			DataSet<Tuple2<Long, Long>> reduced = data.groupBy(0).reduce(new SelectOneReducer<Tuple2<Long, Long>>());
+			reduced.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			
+			DataSet<Tuple2<Long, Long>> filtered = data.filter(new FilterFunction<Tuple2<Long, Long>>() {
+				@Override
+				public boolean filter(Tuple2<Long, Long> value) throws Exception {
+					return false;
+				}
+			});
+			
+			DataSet<Tuple2<Long, Long>> joined = reduced.join(filtered)
+					.where(1).equalTo(1)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+			
+			joined.flatMap(new IdentityFlatMapper<Tuple2<Long, Long>>())
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+			joined.coGroup(filtered.groupBy(1).reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>()))
+					.where(0).equalTo(0)
+					.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+					.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+			List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
+
+			// gather the optimizer DAG nodes
+
+			DataSinkNode sinkAfterReduce = sinks.get(0);
+			DataSinkNode sinkAfterFlatMap = sinks.get(1);
+			DataSinkNode sinkAfterCoGroup = sinks.get(2);
+
+			SingleInputNode reduceNode = (SingleInputNode) sinkAfterReduce.getPredecessorNode();
+			SingleInputNode mapNode = (SingleInputNode) reduceNode.getPredecessorNode();
+
+			SingleInputNode flatMapNode = (SingleInputNode) sinkAfterFlatMap.getPredecessorNode();
+			TwoInputNode joinNode = (TwoInputNode) flatMapNode.getPredecessorNode();
+			SingleInputNode filterNode = (SingleInputNode) joinNode.getSecondPredecessorNode();
+
+			TwoInputNode coGroupNode = (TwoInputNode) sinkAfterCoGroup.getPredecessorNode();
+			SingleInputNode otherReduceNode = (SingleInputNode) coGroupNode.getSecondPredecessorNode();
+
+			// test sanity checks (that we constructed the DAG correctly)
+
+			assertEquals(reduceNode, joinNode.getFirstPredecessorNode());
+			assertEquals(mapNode, filterNode.getPredecessorNode());
+			assertEquals(joinNode, coGroupNode.getFirstPredecessorNode());
+			assertEquals(filterNode, otherReduceNode.getPredecessorNode());
+
+			// verify the pipeline breaking status
+
+			assertFalse(sinkAfterReduce.getInputConnection().isBreakingPipeline());
+			assertFalse(sinkAfterFlatMap.getInputConnection().isBreakingPipeline());
+			assertFalse(sinkAfterCoGroup.getInputConnection().isBreakingPipeline());
+
+			assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(flatMapNode.getIncomingConnection().isBreakingPipeline());
+			assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
+			assertFalse(coGroupNode.getFirstIncomingConnection().isBreakingPipeline());
+			assertFalse(coGroupNode.getSecondIncomingConnection().isBreakingPipeline());
+
+			// these should be pipeline breakers
+			assertTrue(reduceNode.getIncomingConnection().isBreakingPipeline());
+			assertTrue(filterNode.getIncomingConnection().isBreakingPipeline());
+			assertTrue(otherReduceNode.getIncomingConnection().isBreakingPipeline());
+			assertTrue(joinNode.getSecondIncomingConnection().isBreakingPipeline());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private static List<DataSinkNode> convertPlan(Plan p) {
+		GraphCreatingVisitor dagCreator =
+				new GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode());
+
+		// create the DAG
+		p.accept(dagCreator);
+		List<DataSinkNode> sinks = dagCreator.getSinks();
+
+		// build a single root and run the branch tracking logic
+		OptimizerNode rootNode;
+		if (sinks.size() == 1) {
+			rootNode = sinks.get(0);
+		}
+		else {
+			Iterator<DataSinkNode> iter = sinks.iterator();
+			rootNode = iter.next();
+
+			while (iter.hasNext()) {
+				rootNode = new SinkJoiner(rootNode, iter.next());
+			}
+		}
+		rootNode.accept(new IdAndEstimatesVisitor(null));
+		rootNode.accept(new BranchesVisitor());
+
+		return sinks;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
new file mode 100644
index 0000000..3e32905
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class GlobalPropertiesFilteringTest {
+
+	private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
+			new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+			);
+
+	@Test
+	public void testAllErased1() {
+
+		SingleInputSemanticProperties semProps = new SingleInputSemanticProperties();
+
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setHashPartitioned(new FieldList(0, 1));
+		gprops.addUniqueFieldCombination(new FieldSet(3, 4));
+		gprops.addUniqueFieldCombination(new FieldSet(5, 6));
+
+		GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0);
+
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+		assertNull(result.getPartitioningFields());
+		assertNull(result.getPartitioningOrdering());
+		assertNull(result.getUniqueFieldCombination());
+	}
+
+	@Test
+	public void testAllErased2() {
+
+		SingleInputSemanticProperties semProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(semProps, new String[]{"2"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setHashPartitioned(new FieldList(0, 1));
+		gprops.addUniqueFieldCombination(new FieldSet(3, 4));
+		gprops.addUniqueFieldCombination(new FieldSet(5, 6));
+
+		GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0);
+
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+		assertNull(result.getPartitioningFields());
+		assertNull(result.getPartitioningOrdering());
+		assertNull(result.getUniqueFieldCombination());
+	}
+
+	@Test
+	public void testHashPartitioningPreserved1() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning());
+		FieldList pFields = result.getPartitioningFields();
+		assertEquals(3, pFields.size());
+		assertTrue(pFields.contains(0));
+		assertTrue(pFields.contains(1));
+		assertTrue(pFields.contains(4));
+	}
+
+	@Test
+	public void testHashPartitioningPreserved2() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning());
+		FieldList pFields = result.getPartitioningFields();
+		assertEquals(3, pFields.size());
+		assertTrue(pFields.contains(1));
+		assertTrue(pFields.contains(2));
+		assertTrue(pFields.contains(3));
+	}
+
+	@Test
+	public void testHashPartitioningErased() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+		assertNull(result.getPartitioningFields());
+	}
+
+	@Test
+	public void testAnyPartitioningPreserved1() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setAnyPartitioning(new FieldList(0, 1, 4));
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning());
+		FieldList pFields = result.getPartitioningFields();
+		assertEquals(3, pFields.size());
+		assertTrue(pFields.contains(0));
+		assertTrue(pFields.contains(1));
+		assertTrue(pFields.contains(4));
+	}
+
+	@Test
+	public void testAnyPartitioningPreserved2() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setAnyPartitioning(new FieldList(0, 1, 4));
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning());
+		FieldList pFields = result.getPartitioningFields();
+		assertEquals(3, pFields.size());
+		assertTrue(pFields.contains(1));
+		assertTrue(pFields.contains(2));
+		assertTrue(pFields.contains(3));
+	}
+
+	@Test
+	public void testAnyPartitioningErased() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setAnyPartitioning(new FieldList(0, 1, 4));
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+		assertNull(result.getPartitioningFields());
+	}
+
+	@Test
+	public void testCustomPartitioningPreserved1() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
+		gprops.setCustomPartitioned(new FieldList(0, 4), myP);
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning());
+		FieldList pFields = result.getPartitioningFields();
+		assertEquals(2, pFields.size());
+		assertTrue(pFields.contains(0));
+		assertTrue(pFields.contains(4));
+		assertEquals(myP, result.getCustomPartitioner());
+	}
+
+	@Test
+	public void testCustomPartitioningPreserved2() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
+		gprops.setCustomPartitioned(new FieldList(0, 4), myP);
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning());
+		FieldList pFields = result.getPartitioningFields();
+		assertEquals(2, pFields.size());
+		assertTrue(pFields.contains(1));
+		assertTrue(pFields.contains(3));
+		assertEquals(myP, result.getCustomPartitioner());
+	}
+
+	@Test
+	public void testCustomPartitioningErased() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
+		gprops.setCustomPartitioned(new FieldList(0, 4), myP);
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+		assertNull(result.getPartitioningFields());
+		assertNull(result.getCustomPartitioner());
+	}
+
+	@Test
+	public void testRangePartitioningPreserved1() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;2;5"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(2, StringValue.class, Order.ASCENDING);
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setRangePartitioned(o);
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning());
+		FieldList pFields = result.getPartitioningFields();
+		assertEquals(3, pFields.size());
+		assertEquals(1, pFields.get(0).intValue());
+		assertEquals(5, pFields.get(1).intValue());
+		assertEquals(2, pFields.get(2).intValue());
+		Ordering pOrder = result.getPartitioningOrdering();
+		assertEquals(3, pOrder.getNumberOfFields());
+		assertEquals(1, pOrder.getFieldNumber(0).intValue());
+		assertEquals(5, pOrder.getFieldNumber(1).intValue());
+		assertEquals(2, pOrder.getFieldNumber(2).intValue());
+		assertEquals(Order.ASCENDING, pOrder.getOrder(0));
+		assertEquals(Order.DESCENDING, pOrder.getOrder(1));
+		assertEquals(Order.ASCENDING, pOrder.getOrder(2));
+		assertEquals(IntValue.class, pOrder.getType(0));
+		assertEquals(LongValue.class, pOrder.getType(1));
+		assertEquals(StringValue.class, pOrder.getType(2));
+	}
+
+	@Test
+	public void testRangePartitioningPreserved2() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1->3; 2->0; 5->1"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(2, StringValue.class, Order.ASCENDING);
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setRangePartitioned(o);
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning());
+		FieldList pFields = result.getPartitioningFields();
+		assertEquals(3, pFields.size());
+		assertEquals(3, pFields.get(0).intValue());
+		assertEquals(1, pFields.get(1).intValue());
+		assertEquals(0, pFields.get(2).intValue());
+		Ordering pOrder = result.getPartitioningOrdering();
+		assertEquals(3, pOrder.getNumberOfFields());
+		assertEquals(3, pOrder.getFieldNumber(0).intValue());
+		assertEquals(1, pOrder.getFieldNumber(1).intValue());
+		assertEquals(0, pOrder.getFieldNumber(2).intValue());
+		assertEquals(Order.ASCENDING, pOrder.getOrder(0));
+		assertEquals(Order.DESCENDING, pOrder.getOrder(1));
+		assertEquals(Order.ASCENDING, pOrder.getOrder(2));
+		assertEquals(IntValue.class, pOrder.getType(0));
+		assertEquals(LongValue.class, pOrder.getType(1));
+		assertEquals(StringValue.class, pOrder.getType(2));
+	}
+
+	@Test
+	public void testRangePartitioningErased() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;5"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(2, StringValue.class, Order.ASCENDING);
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setRangePartitioned(o);
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
+		assertNull(result.getPartitioningOrdering());
+		assertNull(result.getPartitioningFields());
+	}
+
+	@Test
+	public void testRebalancingPreserved() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setForcedRebalanced();
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+
+		assertEquals(PartitioningProperty.FORCED_REBALANCED, result.getPartitioning());
+		assertNull(result.getPartitioningFields());
+	}
+
+	@Test
+	public void testUniqueFieldGroupsPreserved1() {
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+		FieldSet set1 = new FieldSet(0, 1, 2);
+		FieldSet set2 = new FieldSet(3, 4);
+		FieldSet set3 = new FieldSet(4, 5, 6, 7);
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.addUniqueFieldCombination(set1);
+		gprops.addUniqueFieldCombination(set2);
+		gprops.addUniqueFieldCombination(set3);
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+		Set<FieldSet> unique = result.getUniqueFieldCombination();
+		FieldSet expected1 = new FieldSet(0, 1, 2);
+		FieldSet expected2 = new FieldSet(3, 4);
+
+		Assert.assertTrue(unique.size() == 2);
+		Assert.assertTrue(unique.contains(expected1));
+		Assert.assertTrue(unique.contains(expected2));
+	}
+
+	@Test
+	public void testUniqueFieldGroupsPreserved2() {
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->5;1;2;3->6;4"}, null, null, tupleInfo, tupleInfo);
+
+		FieldSet set1 = new FieldSet(0, 1, 2);
+		FieldSet set2 = new FieldSet(3, 4);
+		FieldSet set3 = new FieldSet(4, 5, 6, 7);
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.addUniqueFieldCombination(set1);
+		gprops.addUniqueFieldCombination(set2);
+		gprops.addUniqueFieldCombination(set3);
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+		Set<FieldSet> unique = result.getUniqueFieldCombination();
+		FieldSet expected1 = new FieldSet(1, 2, 5);
+		FieldSet expected2 = new FieldSet(4, 6);
+
+		Assert.assertTrue(unique.size() == 2);
+		Assert.assertTrue(unique.contains(expected1));
+		Assert.assertTrue(unique.contains(expected2));
+	}
+
+	@Test
+	public void testUniqueFieldGroupsErased() {
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0; 3; 5; 6; 7"}, null, null, tupleInfo, tupleInfo);
+
+		FieldSet set1 = new FieldSet(0, 1, 2);
+		FieldSet set2 = new FieldSet(3, 4);
+		FieldSet set3 = new FieldSet(4, 5, 6, 7);
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.addUniqueFieldCombination(set1);
+		gprops.addUniqueFieldCombination(set2);
+		gprops.addUniqueFieldCombination(set3);
+
+		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
+		Assert.assertNull(result.getUniqueFieldCombination());
+	}
+
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testInvalidInputIndex() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+		GlobalProperties gprops = new GlobalProperties();
+		gprops.setHashPartitioned(new FieldList(0, 1));
+
+		gprops.filterBySemanticProperties(sprops, 1);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
new file mode 100644
index 0000000..52826d6
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+public class GlobalPropertiesMatchingTest {
+
+	@Test
+	public void testMatchingAnyPartitioning() {
+		try {
+			
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setAnyPartitioning(new FieldSet(6, 2));
+			
+			// match any partitioning
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setAnyPartitioning(new FieldList(2, 6));
+				assertTrue(req.isMetBy(gp1));
+
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setAnyPartitioning(new FieldList(6, 2));
+				assertTrue(req.isMetBy(gp2));
+
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setAnyPartitioning(new FieldList(6, 2, 4));
+				assertFalse(req.isMetBy(gp3));
+
+				GlobalProperties gp4 = new GlobalProperties();
+				gp4.setAnyPartitioning(new FieldList(6, 1));
+				assertFalse(req.isMetBy(gp4));
+
+				GlobalProperties gp5 = new GlobalProperties();
+				gp5.setAnyPartitioning(new FieldList(2));
+				assertTrue(req.isMetBy(gp5));
+			}
+
+			// match hash partitioning
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setHashPartitioned(new FieldList(2, 6));
+				assertTrue(req.isMetBy(gp1));
+
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setHashPartitioned(new FieldList(6, 2));
+				assertTrue(req.isMetBy(gp2));
+
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setHashPartitioned(new FieldList(6, 1));
+				assertFalse(req.isMetBy(gp3));
+			}
+			
+			// match range partitioning
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+				assertTrue(req.isMetBy(gp1));
+				
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+				assertTrue(req.isMetBy(gp2));
+
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+				assertFalse(req.isMetBy(gp3));
+				
+				GlobalProperties gp4 = new GlobalProperties();
+				gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
+				assertTrue(req.isMetBy(gp4));
+			}
+			
+			// match custom partitioning
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setCustomPartitioned(new FieldList(2, 6), new MockPartitioner());
+				assertTrue(req.isMetBy(gp1));
+				
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner());
+				assertTrue(req.isMetBy(gp2));
+				
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setCustomPartitioned(new FieldList(6, 1), new MockPartitioner());
+				assertFalse(req.isMetBy(gp3));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testMatchingCustomPartitioning() {
+		try {
+			final Partitioner<Tuple2<Long, Integer>> partitioner = new MockPartitioner();
+			
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setCustomPartitioned(new FieldSet(6, 2), partitioner);
+			
+			// match custom partitionings
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setCustomPartitioned(new FieldList(2, 6), partitioner);
+				assertTrue(req.isMetBy(gp1));
+				
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setCustomPartitioned(new FieldList(6, 2), partitioner);
+				assertTrue(req.isMetBy(gp2));
+				
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner());
+				assertFalse(req.isMetBy(gp3));
+			}
+			
+			// cannot match other types of partitionings
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setAnyPartitioning(new FieldList(6, 2));
+				assertFalse(req.isMetBy(gp1));
+				
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setHashPartitioned(new FieldList(6, 2));
+				assertFalse(req.isMetBy(gp2));
+				
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+				assertFalse(req.isMetBy(gp3));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testStrictlyMatchingAnyPartitioning() {
+
+		RequestedGlobalProperties req = new RequestedGlobalProperties();
+		req.setAnyPartitioning(new FieldList(6, 2));
+
+		// match any partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setAnyPartitioning(new FieldList(6, 2));
+			assertTrue(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setAnyPartitioning(new FieldList(2, 6));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setAnyPartitioning(new FieldList(6, 2, 3));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp4 = new GlobalProperties();
+			gp3.setAnyPartitioning(new FieldList(6, 1));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp5 = new GlobalProperties();
+			gp4.setAnyPartitioning(new FieldList(2));
+			assertFalse(req.isMetBy(gp4));
+		}
+
+		// match hash partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setHashPartitioned(new FieldList(6, 2));
+			assertTrue(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setHashPartitioned(new FieldList(2, 6));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setHashPartitioned(new FieldList(6, 1));
+			assertFalse(req.isMetBy(gp3));
+		}
+
+		// match range partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+			assertTrue(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp4 = new GlobalProperties();
+			gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
+			assertFalse(req.isMetBy(gp4));
+		}
+
+	}
+
+	@Test
+	public void testStrictlyMatchingHashPartitioning() {
+
+		RequestedGlobalProperties req = new RequestedGlobalProperties();
+		req.setHashPartitioned(new FieldList(6, 2));
+
+		// match any partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setAnyPartitioning(new FieldList(6, 2));
+			assertFalse(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setAnyPartitioning(new FieldList(2, 6));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setAnyPartitioning(new FieldList(6, 1));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp4 = new GlobalProperties();
+			gp4.setAnyPartitioning(new FieldList(2));
+			assertFalse(req.isMetBy(gp4));
+		}
+
+		// match hash partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setHashPartitioned(new FieldList(6, 2));
+			assertTrue(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setHashPartitioned(new FieldList(2, 6));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setHashPartitioned(new FieldList(6, 1));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp4 = new GlobalProperties();
+			gp4.setHashPartitioned(new FieldList(6, 2, 0));
+			assertFalse(req.isMetBy(gp4));
+		}
+
+		// match range partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+			assertFalse(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp4 = new GlobalProperties();
+			gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
+			assertFalse(req.isMetBy(gp4));
+		}
+
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
new file mode 100644
index 0000000..0868720
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.junit.Test;
+
+public class GlobalPropertiesPushdownTest {
+
+	@Test
+	public void testAnyPartitioningPushedDown() {
+		try {
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setAnyPartitioning(new FieldSet(3, 1));
+			
+			RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+			assertEquals(PartitioningProperty.ANY_PARTITIONING, preserved.getPartitioning());
+			assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
+			
+			RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0);
+			assertTrue(nonPreserved == null || nonPreserved.isTrivial());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testHashPartitioningPushedDown() {
+		try {
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setHashPartitioned(new FieldSet(3, 1));
+			
+			RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+			assertEquals(PartitioningProperty.HASH_PARTITIONED, preserved.getPartitioning());
+			assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
+			
+			RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0);
+			assertTrue(nonPreserved == null || nonPreserved.isTrivial());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningNotPushedDown() {
+		try {
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setCustomPartitioned(new FieldSet(3, 1), new MockPartitioner());
+			
+			RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+			assertTrue(pushedDown == null || pushedDown.isTrivial());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testForcedReblancingNotPushedDown() {
+		try {
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setForceRebalancing();
+			
+			RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+			assertTrue(pushedDown == null || pushedDown.isTrivial());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static SemanticProperties getAllPreservingSemProps() {
+		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
+	}
+	
+	private static SemanticProperties getNonePreservingSemProps() {
+		return new SingleInputSemanticProperties();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
new file mode 100644
index 0000000..1ff62ed
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+public class LocalPropertiesFilteringTest {
+
+	private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
+			new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+			);
+
+	@Test
+	public void testAllErased1() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+		lProps = lProps.addUniqueFields(new FieldSet(5,6));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testAllErased2() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"5"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testGroupingPreserved1() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNotNull(filtered.getGroupedFields());
+		assertEquals(3, filtered.getGroupedFields().size());
+		assertTrue(filtered.getGroupedFields().contains(0));
+		assertTrue(filtered.getGroupedFields().contains(2));
+		assertTrue(filtered.getGroupedFields().contains(3));
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testGroupingPreserved2() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0;3->7"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNotNull(filtered.getGroupedFields());
+		assertEquals(3, filtered.getGroupedFields().size());
+		assertTrue(filtered.getGroupedFields().contains(4));
+		assertTrue(filtered.getGroupedFields().contains(0));
+		assertTrue(filtered.getGroupedFields().contains(7));
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testGroupingErased() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testSortingPreserved1() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;5"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		LocalProperties lProps = LocalProperties.forOrdering(o);
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldList gFields = filtered.getGroupedFields();
+		Ordering order = filtered.getOrdering();
+
+		assertNotNull(gFields);
+		assertEquals(3, gFields.size());
+		assertTrue(gFields.contains(0));
+		assertTrue(gFields.contains(2));
+		assertTrue(gFields.contains(5));
+		assertNotNull(order);
+		assertEquals(3, order.getNumberOfFields());
+		assertEquals(2, order.getFieldNumber(0).intValue());
+		assertEquals(0, order.getFieldNumber(1).intValue());
+		assertEquals(5, order.getFieldNumber(2).intValue());
+		assertEquals(Order.ASCENDING, order.getOrder(0));
+		assertEquals(Order.DESCENDING, order.getOrder(1));
+		assertEquals(Order.DESCENDING, order.getOrder(2));
+		assertEquals(IntValue.class, order.getType(0));
+		assertEquals(StringValue.class, order.getType(1));
+		assertEquals(LongValue.class, order.getType(2));
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testSortingPreserved2() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->3;2->7;5->1"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		LocalProperties lProps = LocalProperties.forOrdering(o);
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldList gFields = filtered.getGroupedFields();
+		Ordering order = filtered.getOrdering();
+
+		assertNotNull(gFields);
+		assertEquals(3, gFields.size());
+		assertTrue(gFields.contains(3));
+		assertTrue(gFields.contains(7));
+		assertTrue(gFields.contains(1));
+		assertNotNull(order);
+		assertEquals(3, order.getNumberOfFields());
+		assertEquals(7, order.getFieldNumber(0).intValue());
+		assertEquals(3, order.getFieldNumber(1).intValue());
+		assertEquals(1, order.getFieldNumber(2).intValue());
+		assertEquals(Order.ASCENDING, order.getOrder(0));
+		assertEquals(Order.DESCENDING, order.getOrder(1));
+		assertEquals(Order.DESCENDING, order.getOrder(2));
+		assertEquals(IntValue.class, order.getType(0));
+		assertEquals(StringValue.class, order.getType(1));
+		assertEquals(LongValue.class, order.getType(2));
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testSortingPreserved3() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		LocalProperties lProps = LocalProperties.forOrdering(o);
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldList gFields = filtered.getGroupedFields();
+		Ordering order = filtered.getOrdering();
+
+		assertNotNull(gFields);
+		assertEquals(2, gFields.size());
+		assertTrue(gFields.contains(0));
+		assertTrue(gFields.contains(2));
+		assertNotNull(order);
+		assertEquals(2, order.getNumberOfFields());
+		assertEquals(2, order.getFieldNumber(0).intValue());
+		assertEquals(0, order.getFieldNumber(1).intValue());
+		assertEquals(Order.ASCENDING, order.getOrder(0));
+		assertEquals(Order.DESCENDING, order.getOrder(1));
+		assertEquals(IntValue.class, order.getType(0));
+		assertEquals(StringValue.class, order.getType(1));
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testSortingPreserved4() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"2->7;5"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		LocalProperties lProps = LocalProperties.forOrdering(o);
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldList gFields = filtered.getGroupedFields();
+		Ordering order = filtered.getOrdering();
+
+		assertNotNull(gFields);
+		assertEquals(1, gFields.size());
+		assertTrue(gFields.contains(7));
+		assertNotNull(order);
+		assertEquals(1, order.getNumberOfFields());
+		assertEquals(7, order.getFieldNumber(0).intValue());
+		assertEquals(Order.ASCENDING, order.getOrder(0));
+		assertEquals(IntValue.class, order.getType(0));
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testSortingErased() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;5"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		LocalProperties lProps = LocalProperties.forOrdering(o);
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldList gFields = filtered.getGroupedFields();
+		Ordering order = filtered.getOrdering();
+
+		assertNull(gFields);
+		assertNull(order);
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testUniqueFieldsPreserved1() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = new LocalProperties();
+		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldSet expected1 = new FieldSet(0,1,2);
+		FieldSet expected2 = new FieldSet(3,4);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNotNull(filtered.getUniqueFields());
+		assertEquals(2, filtered.getUniqueFields().size());
+		assertTrue(filtered.getUniqueFields().contains(expected1));
+		assertTrue(filtered.getUniqueFields().contains(expected2));
+	}
+
+	@Test
+	public void testUniqueFieldsPreserved2() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(1,2));
+		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldSet expected1 = new FieldSet(0,1,2);
+		FieldSet expected2 = new FieldSet(3,4);
+
+		assertNull(filtered.getOrdering());
+		assertNotNull(filtered.getGroupedFields());
+		assertEquals(2, filtered.getGroupedFields().size());
+		assertTrue(filtered.getGroupedFields().contains(1));
+		assertTrue(filtered.getGroupedFields().contains(2));
+		assertNotNull(filtered.getUniqueFields());
+		assertEquals(2, filtered.getUniqueFields().size());
+		assertTrue(filtered.getUniqueFields().contains(expected1));
+		assertTrue(filtered.getUniqueFields().contains(expected2));
+	}
+
+	@Test
+	public void testUniqueFieldsPreserved3() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->7;1->6;2->5;3->4;4->3"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = new LocalProperties();
+		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldSet expected1 = new FieldSet(5,6,7);
+		FieldSet expected2 = new FieldSet(3,4);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNotNull(filtered.getUniqueFields());
+		assertEquals(2, filtered.getUniqueFields().size());
+		assertTrue(filtered.getUniqueFields().contains(expected1));
+		assertTrue(filtered.getUniqueFields().contains(expected2));
+	}
+
+	@Test
+	public void testUniqueFieldsErased() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = new LocalProperties();
+		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testInvalidInputIndex() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lprops = LocalProperties.forGrouping(new FieldList(0,1));
+
+		lprops.filterBySemanticProperties(sprops, 1);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
new file mode 100644
index 0000000..74126f8
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Key;
+
+import java.io.IOException;
+
+@SuppressWarnings("serial")
+public class MockDistribution implements DataDistribution {
+
+	@Override
+	public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+		return new Key<?>[0];
+	}
+
+	@Override
+	public int getNumberOfFields() {
+		return 0;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
new file mode 100644
index 0000000..2b2ab14
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+class MockPartitioner implements Partitioner<Tuple2<Long, Integer>> {
+	
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public int partition(Tuple2<Long, Integer> key, int numPartitions) {
+		return 0;
+	}
+}
\ No newline at end of file