You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:02 UTC

[23/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
deleted file mode 100644
index 5175d8c..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataexchange;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
-import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * This test verifies that the optimizer assigns the correct
- * data exchange mode to a simple forward / shuffle plan.
- *
- * <pre>
- *     (source) -> (map) -> (filter) -> (groupBy / reduce)
- * </pre>
- */
-@SuppressWarnings("serial")
-public class DataExchangeModeForwardTest extends CompilerTestBase {
-
-
-	@Test
-	public void testPipelinedForced() {
-		// PIPELINED_FORCED should result in pipelining all the way
-		verifySimpleForwardPlan(ExecutionMode.PIPELINED_FORCED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
-	}
-
-	@Test
-	public void testPipelined() {
-		// PIPELINED should result in pipelining all the way
-		verifySimpleForwardPlan(ExecutionMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
-	}
-
-	@Test
-	public void testBatch() {
-		// BATCH should result in batching the shuffle all the way
-		verifySimpleForwardPlan(ExecutionMode.BATCH,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.BATCH, DataExchangeMode.PIPELINED);
-	}
-
-	@Test
-	public void testBatchForced() {
-		// BATCH_FORCED should result in batching all the way
-		verifySimpleForwardPlan(ExecutionMode.BATCH_FORCED,
-				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
-				DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
-				DataExchangeMode.BATCH, DataExchangeMode.BATCH);
-	}
-
-	private void verifySimpleForwardPlan(ExecutionMode execMode,
-										DataExchangeMode toMap,
-										DataExchangeMode toFilter,
-										DataExchangeMode toKeyExtractor,
-										DataExchangeMode toCombiner,
-										DataExchangeMode toReduce,
-										DataExchangeMode toSink)
-	{
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.getConfig().setExecutionMode(execMode);
-
-			DataSet<String> dataSet = env.readTextFile("/never/accessed");
-			dataSet
-				.map(new MapFunction<String, Integer>() {
-					@Override
-					public Integer map(String value) {
-						return 0;
-					}
-				})
-				.filter(new FilterFunction<Integer>() {
-					@Override
-					public boolean filter(Integer value) {
-						return false;
-					}
-				})
-				.groupBy(new IdentityKeyExtractor<Integer>())
-				.reduceGroup(new Top1GroupReducer<Integer>())
-				.output(new DiscardingOutputFormat<Integer>());
-
-			OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
-			SinkPlanNode sinkNode = optPlan.getDataSinks().iterator().next();
-
-			SingleInputPlanNode reduceNode = (SingleInputPlanNode) sinkNode.getPredecessor();
-			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getPredecessor();
-			SingleInputPlanNode keyExtractorNode = (SingleInputPlanNode) combineNode.getPredecessor();
-
-			SingleInputPlanNode filterNode = (SingleInputPlanNode) keyExtractorNode.getPredecessor();
-			SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
-
-			assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
-			assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
-			assertEquals(toKeyExtractor, keyExtractorNode.getInput().getDataExchangeMode());
-			assertEquals(toCombiner, combineNode.getInput().getDataExchangeMode());
-			assertEquals(toReduce, reduceNode.getInput().getDataExchangeMode());
-			assertEquals(toSink, sinkNode.getInput().getDataExchangeMode());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
deleted file mode 100644
index 6b2691a..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataexchange;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.junit.Test;
-
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * This test checks the correct assignment of the DataExchangeMode to
- * connections for programs that branch, but do not re-join the branches.
- *
- * <pre>
- *                      /---> (filter) -> (sink)
- *                     /
- *                    /
- * (source) -> (map) -----------------\
- *                    \               (join) -> (sink)
- *                     \   (source) --/
- *                      \
- *                       \
- *                        \-> (sink)
- * </pre>
- */
-@SuppressWarnings({"serial", "unchecked"})
-public class DataExchangeModeOpenBranchingTest extends CompilerTestBase {
-
-	@Test
-	public void testPipelinedForced() {
-		// PIPELINED_FORCED should result in pipelining all the way
-		verifyBranchigPlan(ExecutionMode.PIPELINED_FORCED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED);
-	}
-
-	@Test
-	public void testPipelined() {
-		// PIPELINED should result in pipelining all the way
-		verifyBranchigPlan(ExecutionMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED);
-	}
-
-	@Test
-	public void testBatch() {
-		// BATCH should result in batching the shuffle all the way
-		verifyBranchigPlan(ExecutionMode.BATCH,
-				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED, DataExchangeMode.BATCH,
-				DataExchangeMode.BATCH, DataExchangeMode.PIPELINED,
-				DataExchangeMode.PIPELINED);
-	}
-
-	@Test
-	public void testBatchForced() {
-		// BATCH_FORCED should result in batching all the way
-		verifyBranchigPlan(ExecutionMode.BATCH_FORCED,
-				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
-				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
-				DataExchangeMode.BATCH, DataExchangeMode.BATCH,
-				DataExchangeMode.BATCH);
-	}
-
-	private void verifyBranchigPlan(ExecutionMode execMode,
-									DataExchangeMode toMap,
-									DataExchangeMode toFilter,
-									DataExchangeMode toFilterSink,
-									DataExchangeMode toJoin1,
-									DataExchangeMode toJoin2,
-									DataExchangeMode toJoinSink,
-									DataExchangeMode toDirectSink)
-	{
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.getConfig().setExecutionMode(execMode);
-
-			DataSet<Tuple2<Long, Long>> data = env.generateSequence(1, 100000)
-					.map(new MapFunction<Long, Tuple2<Long, Long>>() {
-						@Override
-						public Tuple2<Long, Long> map(Long value) {
-							return new Tuple2<Long, Long>(value, value);
-						}
-					});
-
-			// output 1
-			data
-					.filter(new FilterFunction<Tuple2<Long, Long>>() {
-						@Override
-						public boolean filter(Tuple2<Long, Long> value) {
-							return false;
-						}
-					})
-					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink1");
-
-			// output 2 does a join before a join
-			data
-					.join(env.fromElements(new Tuple2<Long, Long>(1L, 2L)))
-					.where(1)
-					.equalTo(0)
-					.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()).name("sink2");
-
-			// output 3 is direct
-			data
-					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink3");
-
-			OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
-
-			SinkPlanNode filterSink = findSink(optPlan.getDataSinks(), "sink1");
-			SinkPlanNode joinSink = findSink(optPlan.getDataSinks(), "sink2");
-			SinkPlanNode directSink = findSink(optPlan.getDataSinks(), "sink3");
-
-			SingleInputPlanNode filterNode = (SingleInputPlanNode) filterSink.getPredecessor();
-			SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
-
-			DualInputPlanNode joinNode = (DualInputPlanNode) joinSink.getPredecessor();
-			assertEquals(mapNode, joinNode.getInput1().getSource());
-
-			assertEquals(mapNode, directSink.getPredecessor());
-
-			assertEquals(toFilterSink, filterSink.getInput().getDataExchangeMode());
-			assertEquals(toJoinSink, joinSink.getInput().getDataExchangeMode());
-			assertEquals(toDirectSink, directSink.getInput().getDataExchangeMode());
-
-			assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
-			assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
-
-			assertEquals(toJoin1, joinNode.getInput1().getDataExchangeMode());
-			assertEquals(toJoin2, joinNode.getInput2().getDataExchangeMode());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) {
-		for (SinkPlanNode node : collection) {
-			String nodeName = node.getOptimizerNode().getOperator().getName();
-			if (nodeName != null && nodeName.equals(name)) {
-				return node;
-			}
-		}
-
-		throw new IllegalArgumentException("No node with that name was found.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
deleted file mode 100644
index 1a14be5..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataexchange;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dag.SinkJoiner;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper;
-import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
-import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
-import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
-import org.apache.flink.optimizer.traversals.BranchesVisitor;
-import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
-import org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor;
-import org.junit.Test;
-
-import java.util.Iterator;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * This test checks whether connections are correctly marked as pipelined breaking.
- */
-@SuppressWarnings("serial")
-public class PipelineBreakingTest {
-
-	/**
-	 * Tests that no pipeline breakers are inserted into a simple forward
-	 * pipeline.
-	 *
-	 * <pre>
-	 *     (source) -> (map) -> (filter) -> (groupBy / reduce)
-	 * </pre>
-	 */
-	@Test
-	public void testSimpleForwardPlan() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			DataSet<String> dataSet = env.readTextFile("/never/accessed");
-			dataSet
-				.map(new MapFunction<String, Integer>() {
-					@Override
-					public Integer map(String value) {
-						return 0;
-					}
-				})
-				.filter(new FilterFunction<Integer>() {
-					@Override
-					public boolean filter(Integer value) {
-						return false;
-					}
-				})
-				.groupBy(new IdentityKeyExtractor<Integer>())
-				.reduceGroup(new Top1GroupReducer<Integer>())
-				.output(new DiscardingOutputFormat<Integer>());
-
-			DataSinkNode sinkNode = convertPlan(env.createProgramPlan()).get(0);
-
-			SingleInputNode reduceNode = (SingleInputNode) sinkNode.getPredecessorNode();
-			SingleInputNode keyExtractorNode = (SingleInputNode) reduceNode.getPredecessorNode();
-
-			SingleInputNode filterNode = (SingleInputNode) keyExtractorNode.getPredecessorNode();
-			SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
-
-			assertFalse(sinkNode.getInputConnection().isBreakingPipeline());
-			assertFalse(reduceNode.getIncomingConnection().isBreakingPipeline());
-			assertFalse(keyExtractorNode.getIncomingConnection().isBreakingPipeline());
-			assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
-			assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests that branching plans, where the branches are not re-joined,
-	 * do not place pipeline breakers.
-	 * 
-	 * <pre>
-	 *                      /---> (filter) -> (sink)
-	 *                     /
-	 *                    /
-	 * (source) -> (map) -----------------\
-	 *                    \               (join) -> (sink)
-	 *                     \   (source) --/
-	 *                      \
-	 *                       \
-	 *                        \-> (sink)
-	 * </pre>
-	 */
-	@Test
-	public void testBranchingPlanNotReJoined() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			DataSet<Integer> data = env.readTextFile("/never/accessed")
-				.map(new MapFunction<String, Integer>() {
-					@Override
-					public Integer map(String value) {
-						return 0;
-					}
-				});
-
-			// output 1
-			data
-				.filter(new FilterFunction<Integer>() {
-					@Override
-					public boolean filter(Integer value) {
-						return false;
-					}
-				})
-				.output(new DiscardingOutputFormat<Integer>());
-
-			// output 2 does a join before a join
-			data
-				.join(env.fromElements(1, 2, 3, 4))
-					.where(new IdentityKeyExtractor<Integer>())
-					.equalTo(new IdentityKeyExtractor<Integer>())
-				.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
-
-			// output 3 is direct
-			data
-				.output(new DiscardingOutputFormat<Integer>());
-
-			List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
-
-			// gather the optimizer DAG nodes
-
-			DataSinkNode sinkAfterFilter = sinks.get(0);
-			DataSinkNode sinkAfterJoin = sinks.get(1);
-			DataSinkNode sinkDirect = sinks.get(2);
-
-			SingleInputNode filterNode = (SingleInputNode) sinkAfterFilter.getPredecessorNode();
-			SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode();
-
-			TwoInputNode joinNode = (TwoInputNode) sinkAfterJoin.getPredecessorNode();
-			SingleInputNode joinInput = (SingleInputNode) joinNode.getSecondPredecessorNode();
-
-			// verify the non-pipeline breaking status
-
-			assertFalse(sinkAfterFilter.getInputConnection().isBreakingPipeline());
-			assertFalse(sinkAfterJoin.getInputConnection().isBreakingPipeline());
-			assertFalse(sinkDirect.getInputConnection().isBreakingPipeline());
-
-			assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
-			assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
-
-			assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
-			assertFalse(joinNode.getSecondIncomingConnection().isBreakingPipeline());
-			assertFalse(joinInput.getIncomingConnection().isBreakingPipeline());
-
-			// some other sanity checks on the plan construction (cannot hurt)
-
-			assertEquals(mapNode, ((SingleInputNode) joinNode.getFirstPredecessorNode()).getPredecessorNode());
-			assertEquals(mapNode, sinkDirect.getPredecessorNode());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests that branches that are re-joined have place pipeline breakers.
-	 * 
-	 * <pre>
-	 *                                         /-> (sink)
-	 *                                        /
-	 *                         /-> (reduce) -+          /-> (flatmap) -> (sink)
-	 *                        /               \        /
-	 *     (source) -> (map) -                (join) -+-----\
-	 *                        \               /              \
-	 *                         \-> (filter) -+                \
-	 *                                       \                (co group) -> (sink)
-	 *                                        \                /
-	 *                                         \-> (reduce) - /
-	 * </pre>
-	 */
-	@Test
-	public void testReJoinedBranches() {
-		try {
-			// build a test program
-
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			DataSet<Tuple2<Long, Long>> data = env.fromElements(33L, 44L)
-					.map(new MapFunction<Long, Tuple2<Long, Long>>() {
-						@Override
-						public Tuple2<Long, Long> map(Long value) {
-							return new Tuple2<Long, Long>(value, value);
-						}
-					});
-
-			DataSet<Tuple2<Long, Long>> reduced = data.groupBy(0).reduce(new SelectOneReducer<Tuple2<Long, Long>>());
-			reduced.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-			
-			DataSet<Tuple2<Long, Long>> filtered = data.filter(new FilterFunction<Tuple2<Long, Long>>() {
-				@Override
-				public boolean filter(Tuple2<Long, Long> value) throws Exception {
-					return false;
-				}
-			});
-			
-			DataSet<Tuple2<Long, Long>> joined = reduced.join(filtered)
-					.where(1).equalTo(1)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-			
-			joined.flatMap(new IdentityFlatMapper<Tuple2<Long, Long>>())
-					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-
-			joined.coGroup(filtered.groupBy(1).reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>()))
-					.where(0).equalTo(0)
-					.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
-					.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
-
-			List<DataSinkNode> sinks = convertPlan(env.createProgramPlan());
-
-			// gather the optimizer DAG nodes
-
-			DataSinkNode sinkAfterReduce = sinks.get(0);
-			DataSinkNode sinkAfterFlatMap = sinks.get(1);
-			DataSinkNode sinkAfterCoGroup = sinks.get(2);
-
-			SingleInputNode reduceNode = (SingleInputNode) sinkAfterReduce.getPredecessorNode();
-			SingleInputNode mapNode = (SingleInputNode) reduceNode.getPredecessorNode();
-
-			SingleInputNode flatMapNode = (SingleInputNode) sinkAfterFlatMap.getPredecessorNode();
-			TwoInputNode joinNode = (TwoInputNode) flatMapNode.getPredecessorNode();
-			SingleInputNode filterNode = (SingleInputNode) joinNode.getSecondPredecessorNode();
-
-			TwoInputNode coGroupNode = (TwoInputNode) sinkAfterCoGroup.getPredecessorNode();
-			SingleInputNode otherReduceNode = (SingleInputNode) coGroupNode.getSecondPredecessorNode();
-
-			// test sanity checks (that we constructed the DAG correctly)
-
-			assertEquals(reduceNode, joinNode.getFirstPredecessorNode());
-			assertEquals(mapNode, filterNode.getPredecessorNode());
-			assertEquals(joinNode, coGroupNode.getFirstPredecessorNode());
-			assertEquals(filterNode, otherReduceNode.getPredecessorNode());
-
-			// verify the pipeline breaking status
-
-			assertFalse(sinkAfterReduce.getInputConnection().isBreakingPipeline());
-			assertFalse(sinkAfterFlatMap.getInputConnection().isBreakingPipeline());
-			assertFalse(sinkAfterCoGroup.getInputConnection().isBreakingPipeline());
-
-			assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
-			assertFalse(flatMapNode.getIncomingConnection().isBreakingPipeline());
-			assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
-			assertFalse(coGroupNode.getFirstIncomingConnection().isBreakingPipeline());
-			assertFalse(coGroupNode.getSecondIncomingConnection().isBreakingPipeline());
-
-			// these should be pipeline breakers
-			assertTrue(reduceNode.getIncomingConnection().isBreakingPipeline());
-			assertTrue(filterNode.getIncomingConnection().isBreakingPipeline());
-			assertTrue(otherReduceNode.getIncomingConnection().isBreakingPipeline());
-			assertTrue(joinNode.getSecondIncomingConnection().isBreakingPipeline());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	private static List<DataSinkNode> convertPlan(Plan p) {
-		GraphCreatingVisitor dagCreator =
-				new GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode());
-
-		// create the DAG
-		p.accept(dagCreator);
-		List<DataSinkNode> sinks = dagCreator.getSinks();
-
-		// build a single root and run the branch tracking logic
-		OptimizerNode rootNode;
-		if (sinks.size() == 1) {
-			rootNode = sinks.get(0);
-		}
-		else {
-			Iterator<DataSinkNode> iter = sinks.iterator();
-			rootNode = iter.next();
-
-			while (iter.hasNext()) {
-				rootNode = new SinkJoiner(rootNode, iter.next());
-			}
-		}
-		rootNode.accept(new IdAndEstimatesVisitor(null));
-		rootNode.accept(new BranchesVisitor());
-
-		return sinks;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
deleted file mode 100644
index 3e32905..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Set;
-
-public class GlobalPropertiesFilteringTest {
-
-	private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
-			new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
-					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
-					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
-			);
-
-	@Test
-	public void testAllErased1() {
-
-		SingleInputSemanticProperties semProps = new SingleInputSemanticProperties();
-
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setHashPartitioned(new FieldList(0, 1));
-		gprops.addUniqueFieldCombination(new FieldSet(3, 4));
-		gprops.addUniqueFieldCombination(new FieldSet(5, 6));
-
-		GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0);
-
-		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
-		assertNull(result.getPartitioningFields());
-		assertNull(result.getPartitioningOrdering());
-		assertNull(result.getUniqueFieldCombination());
-	}
-
-	@Test
-	public void testAllErased2() {
-
-		SingleInputSemanticProperties semProps = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(semProps, new String[]{"2"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setHashPartitioned(new FieldList(0, 1));
-		gprops.addUniqueFieldCombination(new FieldSet(3, 4));
-		gprops.addUniqueFieldCombination(new FieldSet(5, 6));
-
-		GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0);
-
-		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
-		assertNull(result.getPartitioningFields());
-		assertNull(result.getPartitioningOrdering());
-		assertNull(result.getUniqueFieldCombination());
-	}
-
-	@Test
-	public void testHashPartitioningPreserved1() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setHashPartitioned(new FieldList(0, 1, 4));
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning());
-		FieldList pFields = result.getPartitioningFields();
-		assertEquals(3, pFields.size());
-		assertTrue(pFields.contains(0));
-		assertTrue(pFields.contains(1));
-		assertTrue(pFields.contains(4));
-	}
-
-	@Test
-	public void testHashPartitioningPreserved2() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setHashPartitioned(new FieldList(0, 1, 4));
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning());
-		FieldList pFields = result.getPartitioningFields();
-		assertEquals(3, pFields.size());
-		assertTrue(pFields.contains(1));
-		assertTrue(pFields.contains(2));
-		assertTrue(pFields.contains(3));
-	}
-
-	@Test
-	public void testHashPartitioningErased() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setHashPartitioned(new FieldList(0, 1, 4));
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
-		assertNull(result.getPartitioningFields());
-	}
-
-	@Test
-	public void testAnyPartitioningPreserved1() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setAnyPartitioning(new FieldList(0, 1, 4));
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning());
-		FieldList pFields = result.getPartitioningFields();
-		assertEquals(3, pFields.size());
-		assertTrue(pFields.contains(0));
-		assertTrue(pFields.contains(1));
-		assertTrue(pFields.contains(4));
-	}
-
-	@Test
-	public void testAnyPartitioningPreserved2() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setAnyPartitioning(new FieldList(0, 1, 4));
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning());
-		FieldList pFields = result.getPartitioningFields();
-		assertEquals(3, pFields.size());
-		assertTrue(pFields.contains(1));
-		assertTrue(pFields.contains(2));
-		assertTrue(pFields.contains(3));
-	}
-
-	@Test
-	public void testAnyPartitioningErased() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setAnyPartitioning(new FieldList(0, 1, 4));
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
-		assertNull(result.getPartitioningFields());
-	}
-
-	@Test
-	public void testCustomPartitioningPreserved1() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
-		gprops.setCustomPartitioned(new FieldList(0, 4), myP);
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning());
-		FieldList pFields = result.getPartitioningFields();
-		assertEquals(2, pFields.size());
-		assertTrue(pFields.contains(0));
-		assertTrue(pFields.contains(4));
-		assertEquals(myP, result.getCustomPartitioner());
-	}
-
-	@Test
-	public void testCustomPartitioningPreserved2() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
-		gprops.setCustomPartitioned(new FieldList(0, 4), myP);
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning());
-		FieldList pFields = result.getPartitioningFields();
-		assertEquals(2, pFields.size());
-		assertTrue(pFields.contains(1));
-		assertTrue(pFields.contains(3));
-		assertEquals(myP, result.getCustomPartitioner());
-	}
-
-	@Test
-	public void testCustomPartitioningErased() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
-		gprops.setCustomPartitioned(new FieldList(0, 4), myP);
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
-		assertNull(result.getPartitioningFields());
-		assertNull(result.getCustomPartitioner());
-	}
-
-	@Test
-	public void testRangePartitioningPreserved1() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;2;5"}, null, null, tupleInfo, tupleInfo);
-
-		Ordering o = new Ordering();
-		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
-		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
-		o.appendOrdering(2, StringValue.class, Order.ASCENDING);
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setRangePartitioned(o);
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning());
-		FieldList pFields = result.getPartitioningFields();
-		assertEquals(3, pFields.size());
-		assertEquals(1, pFields.get(0).intValue());
-		assertEquals(5, pFields.get(1).intValue());
-		assertEquals(2, pFields.get(2).intValue());
-		Ordering pOrder = result.getPartitioningOrdering();
-		assertEquals(3, pOrder.getNumberOfFields());
-		assertEquals(1, pOrder.getFieldNumber(0).intValue());
-		assertEquals(5, pOrder.getFieldNumber(1).intValue());
-		assertEquals(2, pOrder.getFieldNumber(2).intValue());
-		assertEquals(Order.ASCENDING, pOrder.getOrder(0));
-		assertEquals(Order.DESCENDING, pOrder.getOrder(1));
-		assertEquals(Order.ASCENDING, pOrder.getOrder(2));
-		assertEquals(IntValue.class, pOrder.getType(0));
-		assertEquals(LongValue.class, pOrder.getType(1));
-		assertEquals(StringValue.class, pOrder.getType(2));
-	}
-
-	@Test
-	public void testRangePartitioningPreserved2() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1->3; 2->0; 5->1"}, null, null, tupleInfo, tupleInfo);
-
-		Ordering o = new Ordering();
-		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
-		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
-		o.appendOrdering(2, StringValue.class, Order.ASCENDING);
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setRangePartitioned(o);
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning());
-		FieldList pFields = result.getPartitioningFields();
-		assertEquals(3, pFields.size());
-		assertEquals(3, pFields.get(0).intValue());
-		assertEquals(1, pFields.get(1).intValue());
-		assertEquals(0, pFields.get(2).intValue());
-		Ordering pOrder = result.getPartitioningOrdering();
-		assertEquals(3, pOrder.getNumberOfFields());
-		assertEquals(3, pOrder.getFieldNumber(0).intValue());
-		assertEquals(1, pOrder.getFieldNumber(1).intValue());
-		assertEquals(0, pOrder.getFieldNumber(2).intValue());
-		assertEquals(Order.ASCENDING, pOrder.getOrder(0));
-		assertEquals(Order.DESCENDING, pOrder.getOrder(1));
-		assertEquals(Order.ASCENDING, pOrder.getOrder(2));
-		assertEquals(IntValue.class, pOrder.getType(0));
-		assertEquals(LongValue.class, pOrder.getType(1));
-		assertEquals(StringValue.class, pOrder.getType(2));
-	}
-
-	@Test
-	public void testRangePartitioningErased() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;5"}, null, null, tupleInfo, tupleInfo);
-
-		Ordering o = new Ordering();
-		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
-		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
-		o.appendOrdering(2, StringValue.class, Order.ASCENDING);
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setRangePartitioned(o);
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
-		assertNull(result.getPartitioningOrdering());
-		assertNull(result.getPartitioningFields());
-	}
-
-	@Test
-	public void testRebalancingPreserved() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setForcedRebalanced();
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-
-		assertEquals(PartitioningProperty.FORCED_REBALANCED, result.getPartitioning());
-		assertNull(result.getPartitioningFields());
-	}
-
-	@Test
-	public void testUniqueFieldGroupsPreserved1() {
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
-
-		FieldSet set1 = new FieldSet(0, 1, 2);
-		FieldSet set2 = new FieldSet(3, 4);
-		FieldSet set3 = new FieldSet(4, 5, 6, 7);
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.addUniqueFieldCombination(set1);
-		gprops.addUniqueFieldCombination(set2);
-		gprops.addUniqueFieldCombination(set3);
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-		Set<FieldSet> unique = result.getUniqueFieldCombination();
-		FieldSet expected1 = new FieldSet(0, 1, 2);
-		FieldSet expected2 = new FieldSet(3, 4);
-
-		Assert.assertTrue(unique.size() == 2);
-		Assert.assertTrue(unique.contains(expected1));
-		Assert.assertTrue(unique.contains(expected2));
-	}
-
-	@Test
-	public void testUniqueFieldGroupsPreserved2() {
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->5;1;2;3->6;4"}, null, null, tupleInfo, tupleInfo);
-
-		FieldSet set1 = new FieldSet(0, 1, 2);
-		FieldSet set2 = new FieldSet(3, 4);
-		FieldSet set3 = new FieldSet(4, 5, 6, 7);
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.addUniqueFieldCombination(set1);
-		gprops.addUniqueFieldCombination(set2);
-		gprops.addUniqueFieldCombination(set3);
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-		Set<FieldSet> unique = result.getUniqueFieldCombination();
-		FieldSet expected1 = new FieldSet(1, 2, 5);
-		FieldSet expected2 = new FieldSet(4, 6);
-
-		Assert.assertTrue(unique.size() == 2);
-		Assert.assertTrue(unique.contains(expected1));
-		Assert.assertTrue(unique.contains(expected2));
-	}
-
-	@Test
-	public void testUniqueFieldGroupsErased() {
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0; 3; 5; 6; 7"}, null, null, tupleInfo, tupleInfo);
-
-		FieldSet set1 = new FieldSet(0, 1, 2);
-		FieldSet set2 = new FieldSet(3, 4);
-		FieldSet set3 = new FieldSet(4, 5, 6, 7);
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.addUniqueFieldCombination(set1);
-		gprops.addUniqueFieldCombination(set2);
-		gprops.addUniqueFieldCombination(set3);
-
-		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
-		Assert.assertNull(result.getUniqueFieldCombination());
-	}
-
-	@Test(expected = IndexOutOfBoundsException.class)
-	public void testInvalidInputIndex() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
-
-		GlobalProperties gprops = new GlobalProperties();
-		gprops.setHashPartitioned(new FieldList(0, 1));
-
-		gprops.filterBySemanticProperties(sprops, 1);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
deleted file mode 100644
index 52826d6..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Test;
-
-public class GlobalPropertiesMatchingTest {
-
-	@Test
-	public void testMatchingAnyPartitioning() {
-		try {
-			
-			RequestedGlobalProperties req = new RequestedGlobalProperties();
-			req.setAnyPartitioning(new FieldSet(6, 2));
-			
-			// match any partitioning
-			{
-				GlobalProperties gp1 = new GlobalProperties();
-				gp1.setAnyPartitioning(new FieldList(2, 6));
-				assertTrue(req.isMetBy(gp1));
-
-				GlobalProperties gp2 = new GlobalProperties();
-				gp2.setAnyPartitioning(new FieldList(6, 2));
-				assertTrue(req.isMetBy(gp2));
-
-				GlobalProperties gp3 = new GlobalProperties();
-				gp3.setAnyPartitioning(new FieldList(6, 2, 4));
-				assertFalse(req.isMetBy(gp3));
-
-				GlobalProperties gp4 = new GlobalProperties();
-				gp4.setAnyPartitioning(new FieldList(6, 1));
-				assertFalse(req.isMetBy(gp4));
-
-				GlobalProperties gp5 = new GlobalProperties();
-				gp5.setAnyPartitioning(new FieldList(2));
-				assertTrue(req.isMetBy(gp5));
-			}
-
-			// match hash partitioning
-			{
-				GlobalProperties gp1 = new GlobalProperties();
-				gp1.setHashPartitioned(new FieldList(2, 6));
-				assertTrue(req.isMetBy(gp1));
-
-				GlobalProperties gp2 = new GlobalProperties();
-				gp2.setHashPartitioned(new FieldList(6, 2));
-				assertTrue(req.isMetBy(gp2));
-
-				GlobalProperties gp3 = new GlobalProperties();
-				gp3.setHashPartitioned(new FieldList(6, 1));
-				assertFalse(req.isMetBy(gp3));
-			}
-			
-			// match range partitioning
-			{
-				GlobalProperties gp1 = new GlobalProperties();
-				gp1.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
-				assertTrue(req.isMetBy(gp1));
-				
-				GlobalProperties gp2 = new GlobalProperties();
-				gp2.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
-				assertTrue(req.isMetBy(gp2));
-
-				GlobalProperties gp3 = new GlobalProperties();
-				gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
-				assertFalse(req.isMetBy(gp3));
-				
-				GlobalProperties gp4 = new GlobalProperties();
-				gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
-				assertTrue(req.isMetBy(gp4));
-			}
-			
-			// match custom partitioning
-			{
-				GlobalProperties gp1 = new GlobalProperties();
-				gp1.setCustomPartitioned(new FieldList(2, 6), new MockPartitioner());
-				assertTrue(req.isMetBy(gp1));
-				
-				GlobalProperties gp2 = new GlobalProperties();
-				gp2.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner());
-				assertTrue(req.isMetBy(gp2));
-				
-				GlobalProperties gp3 = new GlobalProperties();
-				gp3.setCustomPartitioned(new FieldList(6, 1), new MockPartitioner());
-				assertFalse(req.isMetBy(gp3));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testMatchingCustomPartitioning() {
-		try {
-			final Partitioner<Tuple2<Long, Integer>> partitioner = new MockPartitioner();
-			
-			RequestedGlobalProperties req = new RequestedGlobalProperties();
-			req.setCustomPartitioned(new FieldSet(6, 2), partitioner);
-			
-			// match custom partitionings
-			{
-				GlobalProperties gp1 = new GlobalProperties();
-				gp1.setCustomPartitioned(new FieldList(2, 6), partitioner);
-				assertTrue(req.isMetBy(gp1));
-				
-				GlobalProperties gp2 = new GlobalProperties();
-				gp2.setCustomPartitioned(new FieldList(6, 2), partitioner);
-				assertTrue(req.isMetBy(gp2));
-				
-				GlobalProperties gp3 = new GlobalProperties();
-				gp3.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner());
-				assertFalse(req.isMetBy(gp3));
-			}
-			
-			// cannot match other types of partitionings
-			{
-				GlobalProperties gp1 = new GlobalProperties();
-				gp1.setAnyPartitioning(new FieldList(6, 2));
-				assertFalse(req.isMetBy(gp1));
-				
-				GlobalProperties gp2 = new GlobalProperties();
-				gp2.setHashPartitioned(new FieldList(6, 2));
-				assertFalse(req.isMetBy(gp2));
-				
-				GlobalProperties gp3 = new GlobalProperties();
-				gp3.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
-				assertFalse(req.isMetBy(gp3));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testStrictlyMatchingAnyPartitioning() {
-
-		RequestedGlobalProperties req = new RequestedGlobalProperties();
-		req.setAnyPartitioning(new FieldList(6, 2));
-
-		// match any partitioning
-		{
-			GlobalProperties gp1 = new GlobalProperties();
-			gp1.setAnyPartitioning(new FieldList(6, 2));
-			assertTrue(req.isMetBy(gp1));
-
-			GlobalProperties gp2 = new GlobalProperties();
-			gp2.setAnyPartitioning(new FieldList(2, 6));
-			assertFalse(req.isMetBy(gp2));
-
-			GlobalProperties gp3 = new GlobalProperties();
-			gp3.setAnyPartitioning(new FieldList(6, 2, 3));
-			assertFalse(req.isMetBy(gp3));
-
-			GlobalProperties gp4 = new GlobalProperties();
-			gp3.setAnyPartitioning(new FieldList(6, 1));
-			assertFalse(req.isMetBy(gp3));
-
-			GlobalProperties gp5 = new GlobalProperties();
-			gp4.setAnyPartitioning(new FieldList(2));
-			assertFalse(req.isMetBy(gp4));
-		}
-
-		// match hash partitioning
-		{
-			GlobalProperties gp1 = new GlobalProperties();
-			gp1.setHashPartitioned(new FieldList(6, 2));
-			assertTrue(req.isMetBy(gp1));
-
-			GlobalProperties gp2 = new GlobalProperties();
-			gp2.setHashPartitioned(new FieldList(2, 6));
-			assertFalse(req.isMetBy(gp2));
-
-			GlobalProperties gp3 = new GlobalProperties();
-			gp3.setHashPartitioned(new FieldList(6, 1));
-			assertFalse(req.isMetBy(gp3));
-		}
-
-		// match range partitioning
-		{
-			GlobalProperties gp1 = new GlobalProperties();
-			gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
-			assertTrue(req.isMetBy(gp1));
-
-			GlobalProperties gp2 = new GlobalProperties();
-			gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
-			assertFalse(req.isMetBy(gp2));
-
-			GlobalProperties gp3 = new GlobalProperties();
-			gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
-			assertFalse(req.isMetBy(gp3));
-
-			GlobalProperties gp4 = new GlobalProperties();
-			gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
-			assertFalse(req.isMetBy(gp4));
-		}
-
-	}
-
-	@Test
-	public void testStrictlyMatchingHashPartitioning() {
-
-		RequestedGlobalProperties req = new RequestedGlobalProperties();
-		req.setHashPartitioned(new FieldList(6, 2));
-
-		// match any partitioning
-		{
-			GlobalProperties gp1 = new GlobalProperties();
-			gp1.setAnyPartitioning(new FieldList(6, 2));
-			assertFalse(req.isMetBy(gp1));
-
-			GlobalProperties gp2 = new GlobalProperties();
-			gp2.setAnyPartitioning(new FieldList(2, 6));
-			assertFalse(req.isMetBy(gp2));
-
-			GlobalProperties gp3 = new GlobalProperties();
-			gp3.setAnyPartitioning(new FieldList(6, 1));
-			assertFalse(req.isMetBy(gp3));
-
-			GlobalProperties gp4 = new GlobalProperties();
-			gp4.setAnyPartitioning(new FieldList(2));
-			assertFalse(req.isMetBy(gp4));
-		}
-
-		// match hash partitioning
-		{
-			GlobalProperties gp1 = new GlobalProperties();
-			gp1.setHashPartitioned(new FieldList(6, 2));
-			assertTrue(req.isMetBy(gp1));
-
-			GlobalProperties gp2 = new GlobalProperties();
-			gp2.setHashPartitioned(new FieldList(2, 6));
-			assertFalse(req.isMetBy(gp2));
-
-			GlobalProperties gp3 = new GlobalProperties();
-			gp3.setHashPartitioned(new FieldList(6, 1));
-			assertFalse(req.isMetBy(gp3));
-
-			GlobalProperties gp4 = new GlobalProperties();
-			gp4.setHashPartitioned(new FieldList(6, 2, 0));
-			assertFalse(req.isMetBy(gp4));
-		}
-
-		// match range partitioning
-		{
-			GlobalProperties gp1 = new GlobalProperties();
-			gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
-			assertFalse(req.isMetBy(gp1));
-
-			GlobalProperties gp2 = new GlobalProperties();
-			gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
-			assertFalse(req.isMetBy(gp2));
-
-			GlobalProperties gp3 = new GlobalProperties();
-			gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
-			assertFalse(req.isMetBy(gp3));
-
-			GlobalProperties gp4 = new GlobalProperties();
-			gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
-			assertFalse(req.isMetBy(gp4));
-		}
-
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
deleted file mode 100644
index 0868720..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.junit.Test;
-
-public class GlobalPropertiesPushdownTest {
-
-	@Test
-	public void testAnyPartitioningPushedDown() {
-		try {
-			RequestedGlobalProperties req = new RequestedGlobalProperties();
-			req.setAnyPartitioning(new FieldSet(3, 1));
-			
-			RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
-			assertEquals(PartitioningProperty.ANY_PARTITIONING, preserved.getPartitioning());
-			assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
-			
-			RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0);
-			assertTrue(nonPreserved == null || nonPreserved.isTrivial());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testHashPartitioningPushedDown() {
-		try {
-			RequestedGlobalProperties req = new RequestedGlobalProperties();
-			req.setHashPartitioned(new FieldSet(3, 1));
-			
-			RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
-			assertEquals(PartitioningProperty.HASH_PARTITIONED, preserved.getPartitioning());
-			assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
-			
-			RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0);
-			assertTrue(nonPreserved == null || nonPreserved.isTrivial());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testCustomPartitioningNotPushedDown() {
-		try {
-			RequestedGlobalProperties req = new RequestedGlobalProperties();
-			req.setCustomPartitioned(new FieldSet(3, 1), new MockPartitioner());
-			
-			RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
-			assertTrue(pushedDown == null || pushedDown.isTrivial());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testForcedReblancingNotPushedDown() {
-		try {
-			RequestedGlobalProperties req = new RequestedGlobalProperties();
-			req.setForceRebalancing();
-			
-			RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
-			assertTrue(pushedDown == null || pushedDown.isTrivial());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private static SemanticProperties getAllPreservingSemProps() {
-		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
-	}
-	
-	private static SemanticProperties getNonePreservingSemProps() {
-		return new SingleInputSemanticProperties();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
deleted file mode 100644
index 1ff62ed..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Test;
-
-public class LocalPropertiesFilteringTest {
-
-	private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
-			new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
-					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
-					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
-			);
-
-	@Test
-	public void testAllErased1() {
-
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, null, tupleInfo, tupleInfo);
-
-		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2));
-		lProps = lProps.addUniqueFields(new FieldSet(3,4));
-		lProps = lProps.addUniqueFields(new FieldSet(5,6));
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
-		assertNull(filtered.getGroupedFields());
-		assertNull(filtered.getOrdering());
-		assertNull(filtered.getUniqueFields());
-	}
-
-	@Test
-	public void testAllErased2() {
-
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"5"}, null, null, tupleInfo, tupleInfo);
-
-		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2));
-		lProps = lProps.addUniqueFields(new FieldSet(3,4));
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
-		assertNull(filtered.getGroupedFields());
-		assertNull(filtered.getOrdering());
-		assertNull(filtered.getUniqueFields());
-	}
-
-	@Test
-	public void testGroupingPreserved1() {
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
-
-		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
-		assertNotNull(filtered.getGroupedFields());
-		assertEquals(3, filtered.getGroupedFields().size());
-		assertTrue(filtered.getGroupedFields().contains(0));
-		assertTrue(filtered.getGroupedFields().contains(2));
-		assertTrue(filtered.getGroupedFields().contains(3));
-		assertNull(filtered.getOrdering());
-		assertNull(filtered.getUniqueFields());
-	}
-
-	@Test
-	public void testGroupingPreserved2() {
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0;3->7"}, null, null, tupleInfo, tupleInfo);
-
-		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
-		assertNotNull(filtered.getGroupedFields());
-		assertEquals(3, filtered.getGroupedFields().size());
-		assertTrue(filtered.getGroupedFields().contains(4));
-		assertTrue(filtered.getGroupedFields().contains(0));
-		assertTrue(filtered.getGroupedFields().contains(7));
-		assertNull(filtered.getOrdering());
-		assertNull(filtered.getUniqueFields());
-	}
-
-	@Test
-	public void testGroupingErased() {
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0"}, null, null, tupleInfo, tupleInfo);
-
-		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
-		assertNull(filtered.getGroupedFields());
-		assertNull(filtered.getOrdering());
-		assertNull(filtered.getUniqueFields());
-	}
-
-	@Test
-	public void testSortingPreserved1() {
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;5"}, null, null, tupleInfo, tupleInfo);
-
-		Ordering o = new Ordering();
-		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
-		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
-		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
-		LocalProperties lProps = LocalProperties.forOrdering(o);
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-		FieldList gFields = filtered.getGroupedFields();
-		Ordering order = filtered.getOrdering();
-
-		assertNotNull(gFields);
-		assertEquals(3, gFields.size());
-		assertTrue(gFields.contains(0));
-		assertTrue(gFields.contains(2));
-		assertTrue(gFields.contains(5));
-		assertNotNull(order);
-		assertEquals(3, order.getNumberOfFields());
-		assertEquals(2, order.getFieldNumber(0).intValue());
-		assertEquals(0, order.getFieldNumber(1).intValue());
-		assertEquals(5, order.getFieldNumber(2).intValue());
-		assertEquals(Order.ASCENDING, order.getOrder(0));
-		assertEquals(Order.DESCENDING, order.getOrder(1));
-		assertEquals(Order.DESCENDING, order.getOrder(2));
-		assertEquals(IntValue.class, order.getType(0));
-		assertEquals(StringValue.class, order.getType(1));
-		assertEquals(LongValue.class, order.getType(2));
-		assertNull(filtered.getUniqueFields());
-	}
-
-	@Test
-	public void testSortingPreserved2() {
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->3;2->7;5->1"}, null, null, tupleInfo, tupleInfo);
-
-		Ordering o = new Ordering();
-		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
-		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
-		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
-		LocalProperties lProps = LocalProperties.forOrdering(o);
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-		FieldList gFields = filtered.getGroupedFields();
-		Ordering order = filtered.getOrdering();
-
-		assertNotNull(gFields);
-		assertEquals(3, gFields.size());
-		assertTrue(gFields.contains(3));
-		assertTrue(gFields.contains(7));
-		assertTrue(gFields.contains(1));
-		assertNotNull(order);
-		assertEquals(3, order.getNumberOfFields());
-		assertEquals(7, order.getFieldNumber(0).intValue());
-		assertEquals(3, order.getFieldNumber(1).intValue());
-		assertEquals(1, order.getFieldNumber(2).intValue());
-		assertEquals(Order.ASCENDING, order.getOrder(0));
-		assertEquals(Order.DESCENDING, order.getOrder(1));
-		assertEquals(Order.DESCENDING, order.getOrder(2));
-		assertEquals(IntValue.class, order.getType(0));
-		assertEquals(StringValue.class, order.getType(1));
-		assertEquals(LongValue.class, order.getType(2));
-		assertNull(filtered.getUniqueFields());
-	}
-
-	@Test
-	public void testSortingPreserved3() {
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
-
-		Ordering o = new Ordering();
-		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
-		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
-		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
-		LocalProperties lProps = LocalProperties.forOrdering(o);
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-		FieldList gFields = filtered.getGroupedFields();
-		Ordering order = filtered.getOrdering();
-
-		assertNotNull(gFields);
-		assertEquals(2, gFields.size());
-		assertTrue(gFields.contains(0));
-		assertTrue(gFields.contains(2));
-		assertNotNull(order);
-		assertEquals(2, order.getNumberOfFields());
-		assertEquals(2, order.getFieldNumber(0).intValue());
-		assertEquals(0, order.getFieldNumber(1).intValue());
-		assertEquals(Order.ASCENDING, order.getOrder(0));
-		assertEquals(Order.DESCENDING, order.getOrder(1));
-		assertEquals(IntValue.class, order.getType(0));
-		assertEquals(StringValue.class, order.getType(1));
-		assertNull(filtered.getUniqueFields());
-	}
-
-	@Test
-	public void testSortingPreserved4() {
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"2->7;5"}, null, null, tupleInfo, tupleInfo);
-
-		Ordering o = new Ordering();
-		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
-		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
-		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
-		LocalProperties lProps = LocalProperties.forOrdering(o);
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-		FieldList gFields = filtered.getGroupedFields();
-		Ordering order = filtered.getOrdering();
-
-		assertNotNull(gFields);
-		assertEquals(1, gFields.size());
-		assertTrue(gFields.contains(7));
-		assertNotNull(order);
-		assertEquals(1, order.getNumberOfFields());
-		assertEquals(7, order.getFieldNumber(0).intValue());
-		assertEquals(Order.ASCENDING, order.getOrder(0));
-		assertEquals(IntValue.class, order.getType(0));
-		assertNull(filtered.getUniqueFields());
-	}
-
-	@Test
-	public void testSortingErased() {
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;5"}, null, null, tupleInfo, tupleInfo);
-
-		Ordering o = new Ordering();
-		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
-		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
-		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
-		LocalProperties lProps = LocalProperties.forOrdering(o);
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-		FieldList gFields = filtered.getGroupedFields();
-		Ordering order = filtered.getOrdering();
-
-		assertNull(gFields);
-		assertNull(order);
-		assertNull(filtered.getUniqueFields());
-	}
-
-	@Test
-	public void testUniqueFieldsPreserved1() {
-
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
-
-		LocalProperties lProps = new LocalProperties();
-		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
-		lProps = lProps.addUniqueFields(new FieldSet(3,4));
-		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-		FieldSet expected1 = new FieldSet(0,1,2);
-		FieldSet expected2 = new FieldSet(3,4);
-
-		assertNull(filtered.getGroupedFields());
-		assertNull(filtered.getOrdering());
-		assertNotNull(filtered.getUniqueFields());
-		assertEquals(2, filtered.getUniqueFields().size());
-		assertTrue(filtered.getUniqueFields().contains(expected1));
-		assertTrue(filtered.getUniqueFields().contains(expected2));
-	}
-
-	@Test
-	public void testUniqueFieldsPreserved2() {
-
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
-
-		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(1,2));
-		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
-		lProps = lProps.addUniqueFields(new FieldSet(3,4));
-		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-		FieldSet expected1 = new FieldSet(0,1,2);
-		FieldSet expected2 = new FieldSet(3,4);
-
-		assertNull(filtered.getOrdering());
-		assertNotNull(filtered.getGroupedFields());
-		assertEquals(2, filtered.getGroupedFields().size());
-		assertTrue(filtered.getGroupedFields().contains(1));
-		assertTrue(filtered.getGroupedFields().contains(2));
-		assertNotNull(filtered.getUniqueFields());
-		assertEquals(2, filtered.getUniqueFields().size());
-		assertTrue(filtered.getUniqueFields().contains(expected1));
-		assertTrue(filtered.getUniqueFields().contains(expected2));
-	}
-
-	@Test
-	public void testUniqueFieldsPreserved3() {
-
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->7;1->6;2->5;3->4;4->3"}, null, null, tupleInfo, tupleInfo);
-
-		LocalProperties lProps = new LocalProperties();
-		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
-		lProps = lProps.addUniqueFields(new FieldSet(3,4));
-		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-		FieldSet expected1 = new FieldSet(5,6,7);
-		FieldSet expected2 = new FieldSet(3,4);
-
-		assertNull(filtered.getGroupedFields());
-		assertNull(filtered.getOrdering());
-		assertNotNull(filtered.getUniqueFields());
-		assertEquals(2, filtered.getUniqueFields().size());
-		assertTrue(filtered.getUniqueFields().contains(expected1));
-		assertTrue(filtered.getUniqueFields().contains(expected2));
-	}
-
-	@Test
-	public void testUniqueFieldsErased() {
-
-		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
-
-		LocalProperties lProps = new LocalProperties();
-		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
-		lProps = lProps.addUniqueFields(new FieldSet(3,4));
-		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
-
-		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
-
-		assertNull(filtered.getGroupedFields());
-		assertNull(filtered.getOrdering());
-		assertNull(filtered.getUniqueFields());
-	}
-
-	@Test(expected = IndexOutOfBoundsException.class)
-	public void testInvalidInputIndex() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
-
-		LocalProperties lprops = LocalProperties.forGrouping(new FieldList(0,1));
-
-		lprops.filterBySemanticProperties(sprops, 1);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
deleted file mode 100644
index 74126f8..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-
-import java.io.IOException;
-
-@SuppressWarnings("serial")
-public class MockDistribution implements DataDistribution {
-
-	@Override
-	public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
-		return new Key<?>[0];
-	}
-
-	@Override
-	public int getNumberOfFields() {
-		return 0;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
deleted file mode 100644
index 2b2ab14..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-class MockPartitioner implements Partitioner<Tuple2<Long, Integer>> {
-	
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public int partition(Tuple2<Long, Integer> key, int numPartitions) {
-		return 0;
-	}
-}
\ No newline at end of file