You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:45 UTC

[06/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
new file mode 100644
index 0000000..e65758f
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class NestedIterationsTest extends CompilerTestBase {
+
+	@Test
+	public void testRejectNestedBulkIterations() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> data = env.generateSequence(1, 100);
+			
+			IterativeDataSet<Long> outerIteration = data.iterate(100);
+			
+			IterativeDataSet<Long> innerIteration = outerIteration.map(new IdentityMapper<Long>()).iterate(100);
+			
+			DataSet<Long> innerResult = innerIteration.closeWith(innerIteration.map(new IdentityMapper<Long>()));
+			
+			DataSet<Long> outerResult = outerIteration.closeWith(innerResult.map(new IdentityMapper<Long>()));
+			
+			outerResult.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			try {
+				compileNoStats(p);
+			}
+			catch (CompilerException e) {
+				assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRejectNestedWorksetIterations() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> outerIteration = data.iterateDelta(data, 100, 0);
+			
+			DataSet<Tuple2<Long, Long>> inOuter = outerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> innerIteration = inOuter.iterateDelta(inOuter, 100, 0);
+			
+			DataSet<Tuple2<Long, Long>> inInner = innerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+			
+			DataSet<Tuple2<Long, Long>> innerResult = innerIteration.closeWith(inInner, inInner).map(new IdentityMapper<Tuple2<Long,Long>>());
+			
+			DataSet<Tuple2<Long, Long>> outerResult = outerIteration.closeWith(innerResult, innerResult);
+			
+			outerResult.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			try {
+				compileNoStats(p);
+			}
+			catch (CompilerException e) {
+				assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testBulkIterationInClosure() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> data1 = env.generateSequence(1, 100);
+			DataSet<Long> data2 = env.generateSequence(1, 100);
+			
+			IterativeDataSet<Long> firstIteration = data1.iterate(100);
+			
+			DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
+			
+			
+			IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
+			
+			DataSet<Long> joined = mainIteration.join(firstResult)
+					.where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
+					.with(new DummyFlatJoinFunction<Long>());
+			
+			DataSet<Long> mainResult = mainIteration.closeWith(joined);
+			
+			mainResult.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			// optimizer should be able to translate this
+			OptimizedPlan op = compileNoStats(p);
+			
+			// job graph generator should be able to translate this
+			new JobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDeltaIterationInClosure() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> data1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			DataSet<Tuple2<Long, Long>> data2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> firstIteration = data1.iterateDelta(data1, 100, 0);
+			
+			DataSet<Tuple2<Long, Long>> inFirst = firstIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+			
+			DataSet<Tuple2<Long, Long>> firstResult = firstIteration.closeWith(inFirst, inFirst).map(new IdentityMapper<Tuple2<Long,Long>>());
+			
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> mainIteration = data2.iterateDelta(data2, 100, 0);
+			
+			DataSet<Tuple2<Long, Long>> joined = mainIteration.getWorkset().join(firstResult).where(0).equalTo(0)
+							.projectFirst(0).projectSecond(0);
+			
+			DataSet<Tuple2<Long, Long>> mainResult = mainIteration.closeWith(joined, joined);
+			
+			mainResult.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			// optimizer should be able to translate this
+			OptimizedPlan op = compileNoStats(p);
+			
+			// job graph generator should be able to translate this
+			new JobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
new file mode 100644
index 0000000..2b42f85
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PartitionPushdownTest extends CompilerTestBase {
+
+	@Test
+	public void testPartitioningNotPushedDown() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			input
+				.groupBy(0, 1).sum(2)
+				.groupBy(0).sum(1)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			
+			SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode agg2Combiner = (SingleInputPlanNode) agg2Reducer.getInput().getSource();
+			SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Combiner.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_HASH, agg2Reducer.getInput().getShipStrategy());
+			assertEquals(new FieldList(0), agg2Reducer.getInput().getShipStrategyKeys());
+			
+			assertEquals(ShipStrategyType.FORWARD, agg2Combiner.getInput().getShipStrategy());
+			
+			assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy());
+			assertEquals(new FieldList(0, 1), agg1Reducer.getInput().getShipStrategyKeys());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitioningReused() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			input
+				.groupBy(0).sum(1)
+				.groupBy(0, 1).sum(2)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			
+			SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, agg2Reducer.getInput().getShipStrategy());
+			
+			assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy());
+			assertEquals(new FieldList(0), agg1Reducer.getInput().getShipStrategyKeys());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
new file mode 100644
index 0000000..16684dc
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -0,0 +1,845 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class PartitioningReusageTest extends CompilerTestBase {
+
+	@Test
+	public void noPreviousPartitioningJoin1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+					.where(0,1).equalTo(0,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+
+	}
+
+	@Test
+	public void noPreviousPartitioningJoin2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+
+	}
+
+	@Test
+	public void reuseSinglePartitioningJoin1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withForwardedFields("0;1")
+				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(0,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+
+	}
+
+	@Test
+	public void reuseSinglePartitioningJoin2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withForwardedFields("0;1")
+				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseSinglePartitioningJoin3() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.join(set2.partitionByHash(2, 1)
+							.map(new MockMapper())
+							.withForwardedFields("2;1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseSinglePartitioningJoin4() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0)
+				.map(new MockMapper()).withForwardedFields("0")
+				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseSinglePartitioningJoin5() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.join(set2.partitionByHash(2)
+							.map(new MockMapper())
+							.withForwardedFields("2"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withForwardedFields("0;1")
+				.join(set2.partitionByHash(0,1)
+							.map(new MockMapper())
+							.withForwardedFields("0;1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(0,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+
+	@Test
+	public void reuseBothPartitioningJoin2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withForwardedFields("0;1")
+				.join(set2.partitionByHash(1,2)
+								.map(new MockMapper())
+								.withForwardedFields("1;2"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin3() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0)
+				.map(new MockMapper()).withForwardedFields("0")
+				.join(set2.partitionByHash(2,1)
+								.map(new MockMapper())
+								.withForwardedFields("2;1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin4() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0,2)
+				.map(new MockMapper()).withForwardedFields("0;2")
+				.join(set2.partitionByHash(1)
+								.map(new MockMapper())
+								.withForwardedFields("1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,2).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin5() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(2)
+				.map(new MockMapper()).withForwardedFields("2")
+				.join(set2.partitionByHash(1)
+								.map(new MockMapper())
+								.withForwardedFields("1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,2).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin6() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0)
+				.map(new MockMapper()).withForwardedFields("0")
+				.join(set2.partitionByHash(1)
+								.map(new MockMapper())
+								.withForwardedFields("1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,2).equalTo(1,2).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin7() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(2)
+				.map(new MockMapper()).withForwardedFields("2")
+				.join(set2.partitionByHash(1)
+								.map(new MockMapper())
+								.withForwardedFields("1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,2).equalTo(1,2).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+
+	@Test
+	public void noPreviousPartitioningCoGroup1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.coGroup(set2)
+				.where(0,1).equalTo(0,1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+
+	}
+
+	@Test
+	public void noPreviousPartitioningCoGroup2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.coGroup(set2)
+				.where(0,1).equalTo(2,1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+
+	}
+
+	@Test
+	public void reuseSinglePartitioningCoGroup1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withForwardedFields("0;1")
+				.coGroup(set2)
+				.where(0,1).equalTo(0,1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+
+	}
+
+	@Test
+	public void reuseSinglePartitioningCoGroup2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withForwardedFields("0;1")
+				.coGroup(set2)
+				.where(0,1).equalTo(2,1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseSinglePartitioningCoGroup3() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.coGroup(set2.partitionByHash(2, 1)
+								.map(new MockMapper())
+								.withForwardedFields("2;1"))
+				.where(0,1).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseSinglePartitioningCoGroup4() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0)
+				.map(new MockMapper()).withForwardedFields("0")
+				.coGroup(set2)
+				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseSinglePartitioningCoGroup5() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.coGroup(set2.partitionByHash(2)
+								.map(new MockMapper())
+								.withForwardedFields("2"))
+				.where(0,1).equalTo(2,1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withForwardedFields("0;1")
+				.coGroup(set2.partitionByHash(0, 1)
+						.map(new MockMapper())
+						.withForwardedFields("0;1"))
+				.where(0, 1).equalTo(0, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+
+	@Test
+	public void reuseBothPartitioningCoGroup2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withForwardedFields("0;1")
+				.coGroup(set2.partitionByHash(1, 2)
+						.map(new MockMapper())
+						.withForwardedFields("1;2"))
+				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup3() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0)
+				.map(new MockMapper()).withForwardedFields("0")
+				.coGroup(set2.partitionByHash(2, 1)
+						.map(new MockMapper())
+						.withForwardedFields("2;1"))
+				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup4() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0,2)
+				.map(new MockMapper()).withForwardedFields("0;2")
+				.coGroup(set2.partitionByHash(1)
+						.map(new MockMapper())
+						.withForwardedFields("1"))
+				.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup5() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(2)
+				.map(new MockMapper()).withForwardedFields("2")
+				.coGroup(set2.partitionByHash(1)
+						.map(new MockMapper())
+						.withForwardedFields("1"))
+				.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup6() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(2)
+				.map(new MockMapper()).withForwardedFields("2")
+				.coGroup(set2.partitionByHash(2)
+						.map(new MockMapper())
+						.withForwardedFields("2"))
+				.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup7() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(2)
+				.map(new MockMapper()).withForwardedFields("2")
+				.coGroup(set2.partitionByHash(1)
+						.map(new MockMapper())
+						.withForwardedFields("1"))
+				.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+
+
+	private void checkValidJoinInputProperties(DualInputPlanNode join) {
+
+		GlobalProperties inProps1 = join.getInput1().getGlobalProperties();
+		GlobalProperties inProps2 = join.getInput2().getGlobalProperties();
+
+		if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
+				inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
+
+			// check that both inputs are hash partitioned on the same fields
+			FieldList pFields1 = inProps1.getPartitioningFields();
+			FieldList pFields2 = inProps2.getPartitioningFields();
+
+			assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2,
+					pFields1.size() == pFields2.size());
+
+			FieldList reqPFields1 = join.getKeysForInput1();
+			FieldList reqPFields2 = join.getKeysForInput2();
+
+			for(int i=0; i<pFields1.size(); i++) {
+
+				// get fields
+				int f1 = pFields1.get(i);
+				int f2 = pFields2.get(i);
+
+				// check that field positions in original key field list are identical
+				int pos1 = getPosInFieldList(f1, reqPFields1);
+				int pos2 = getPosInFieldList(f2, reqPFields2);
+
+				if(pos1 < 0) {
+					fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1);
+				}
+				if(pos2 < 0) {
+					fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2);
+				}
+				if(pos1 != pos2) {
+					fail("Inputs are not partitioned on the same key fields");
+				}
+			}
+
+		}
+		else if(inProps1.getPartitioning() == PartitioningProperty.FULL_REPLICATION &&
+				inProps2.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) {
+			// we are good. No need to check for fields
+		}
+		else if(inProps1.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED &&
+				inProps2.getPartitioning() == PartitioningProperty.FULL_REPLICATION) {
+			// we are good. No need to check for fields
+		}
+		else {
+			throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroupinputs");
+		}
+
+	}
+
+	private void checkValidCoGroupInputProperties(DualInputPlanNode coGroup) {
+
+		GlobalProperties inProps1 = coGroup.getInput1().getGlobalProperties();
+		GlobalProperties inProps2 = coGroup.getInput2().getGlobalProperties();
+
+		if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
+				inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
+
+			// check that both inputs are hash partitioned on the same fields
+			FieldList pFields1 = inProps1.getPartitioningFields();
+			FieldList pFields2 = inProps2.getPartitioningFields();
+
+			assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2,
+					pFields1.size() == pFields2.size());
+
+			FieldList reqPFields1 = coGroup.getKeysForInput1();
+			FieldList reqPFields2 = coGroup.getKeysForInput2();
+
+			for(int i=0; i<pFields1.size(); i++) {
+
+				// get fields
+				int f1 = pFields1.get(i);
+				int f2 = pFields2.get(i);
+
+				// check that field positions in original key field list are identical
+				int pos1 = getPosInFieldList(f1, reqPFields1);
+				int pos2 = getPosInFieldList(f2, reqPFields2);
+
+				if(pos1 < 0) {
+					fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1);
+				}
+				if(pos2 < 0) {
+					fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2);
+				}
+				if(pos1 != pos2) {
+					fail("Inputs are not partitioned on the same key fields");
+				}
+			}
+
+		}
+		else {
+			throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroup inputs");
+		}
+
+	}
+
+	private int getPosInFieldList(int field, FieldList list) {
+
+		int pos;
+		for(pos=0; pos<list.size(); pos++) {
+			if(field == list.get(pos)) {
+				break;
+			}
+		}
+		if(pos == list.size()) {
+			return -1;
+		} else {
+			return pos;
+		}
+
+	}
+
+
+
+	public static class MockMapper implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+		@Override
+		public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception {
+			return null;
+		}
+	}
+
+	public static class MockJoin implements JoinFunction<Tuple3<Integer, Integer, Integer>,
+			Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+
+		@Override
+		public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception {
+			return null;
+		}
+	}
+
+	public static class MockCoGroup implements CoGroupFunction<Tuple3<Integer, Integer, Integer>,
+				Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+
+		@Override
+		public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> first, Iterable<Tuple3<Integer, Integer, Integer>> second,
+							Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
+
+		}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
new file mode 100644
index 0000000..86f01b0
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.configuration.Configuration;
+
+@SuppressWarnings("serial")
+public class PipelineBreakerTest extends CompilerTestBase {
+
+	@Test
+	public void testPipelineBreakerWithBroadcastVariable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(64);
+			
+			DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+			
+			DataSet<Long> result = source.map(new IdentityMapper<Long>())
+										.map(new IdentityMapper<Long>())
+											.withBroadcastSet(source, "bc");
+			
+			result.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+			
+			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPipelineBreakerBroadcastedAllReduce() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(64);
+			
+			DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+			
+			DataSet<Long> bcInput1 = sourceWithMapper
+										.map(new IdentityMapper<Long>())
+										.reduce(new SelectOneReducer<Long>());
+			DataSet<Long> bcInput2 = env.generateSequence(1, 10);
+			
+			DataSet<Long> result = sourceWithMapper
+					.map(new IdentityMapper<Long>())
+							.withBroadcastSet(bcInput1, "bc1")
+							.withBroadcastSet(bcInput2, "bc2");
+			
+			result.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+			
+			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPipelineBreakerBroadcastedPartialSolution() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(64);
+			
+			
+			DataSet<Long> initialSource = env.generateSequence(1, 10);
+			IterativeDataSet<Long> iteration = initialSource.iterate(100);
+			
+			
+			DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+			
+			DataSet<Long> bcInput1 = sourceWithMapper
+										.map(new IdentityMapper<Long>())
+										.reduce(new SelectOneReducer<Long>());
+			
+			DataSet<Long> result = sourceWithMapper
+					.map(new IdentityMapper<Long>())
+							.withBroadcastSet(iteration, "bc2")
+							.withBroadcastSet(bcInput1, "bc1");
+							
+			
+			iteration.closeWith(result).print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			BulkIterationPlanNode iterationPlanNode = (BulkIterationPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode mapper = (SingleInputPlanNode) iterationPlanNode.getRootOfStepFunction();
+			
+			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPilelineBreakerWithCross() {
+		try {
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
+			}
+			
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
+			}
+			
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
+			}
+			
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
new file mode 100644
index 0000000..7be2b16
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -0,0 +1,897 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@SuppressWarnings({"serial"})
+public class PropertyDataSourceTest extends CompilerTestBase {
+
+	private List<Tuple3<Long, SomePojo, String>> tuple3PojoData = new ArrayList<Tuple3<Long, SomePojo, String>>();
+	private TupleTypeInfo<Tuple3<Long, SomePojo, String>> tuple3PojoType = new TupleTypeInfo<Tuple3<Long, SomePojo, String>>(
+			BasicTypeInfo.LONG_TYPE_INFO,
+			TypeExtractor.createTypeInfo(SomePojo.class),
+			BasicTypeInfo.STRING_TYPE_INFO
+	);
+
+	@Test
+	public void checkSinglePartitionedSource1() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource2() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(1, 0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource3() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("*");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1, 2, 3, 4)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource4() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1, 2, 3)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource5() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1.stringField");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(3)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource6() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1.intField; f2");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2, 4)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource7() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("byDate", 1, 0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+		Assert.assertTrue(gprops.getCustomPartitioner() != null);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedGroupedSource1() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0)
+				.splitsGroupedBy(0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedGroupedSource2() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0)
+				.splitsGroupedBy(1, 0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0, 1)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedGroupedSource3() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(1)
+				.splitsGroupedBy(0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedGroupedSource4() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0, 1)
+				.splitsGroupedBy(0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedGroupedSource5() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f2")
+				.splitsGroupedBy("f2");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(4)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(4)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedGroupedSource6() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1.intField")
+				.splitsGroupedBy("f0; f1.intField");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedGroupedSource7() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1.intField")
+				.splitsGroupedBy("f1");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedGroupedSource8() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1")
+				.splitsGroupedBy("f1.stringField");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedOrderedSource1() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(1)
+				.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedOrderedSource2() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(1)
+				.splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1, 0)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedOrderedSource3() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0)
+				.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedOrderedSource4() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0, 1)
+				.splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedOrderedSource5() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+			.splitsPartitionedBy("f1.intField")
+			.splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedOrderedSource6() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1.intField")
+				.splitsOrderedBy("f1", new Order[]{Order.DESCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedOrderedSource7() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1")
+				.splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkCoPartitionedSources1() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data1 =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data1.getSplitDataProperties()
+				.splitsPartitionedBy("byDate", 0);
+
+		DataSource<Tuple2<Long, String>> data2 =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data2.getSplitDataProperties()
+				.splitsPartitionedBy("byDate", 0);
+
+		data1.union(data2).print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
+		SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
+
+		GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
+		LocalProperties lprops1 = sourceNode1.getLocalProperties();
+		GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
+		LocalProperties lprops2 = sourceNode2.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+		Assert.assertTrue(lprops1.getGroupedFields() == null);
+		Assert.assertTrue(lprops1.getOrdering() == null);
+
+		Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+		Assert.assertTrue(lprops2.getGroupedFields() == null);
+		Assert.assertTrue(lprops2.getOrdering() == null);
+
+		Assert.assertTrue(gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
+	}
+
+	@Test
+	public void checkCoPartitionedSources2() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data1 =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data1.getSplitDataProperties()
+				.splitsPartitionedBy("byCountry", 0);
+
+		DataSource<Tuple2<Long, String>> data2 =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data2.getSplitDataProperties()
+				.splitsPartitionedBy("byDate", 0);
+
+		data1.union(data2).print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
+		SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
+
+		GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
+		LocalProperties lprops1 = sourceNode1.getLocalProperties();
+		GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
+		LocalProperties lprops2 = sourceNode2.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+		Assert.assertTrue(lprops1.getGroupedFields() == null);
+		Assert.assertTrue(lprops1.getOrdering() == null);
+
+		Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+		Assert.assertTrue(lprops2.getGroupedFields() == null);
+		Assert.assertTrue(lprops2.getOrdering() == null);
+
+		Assert.assertTrue(!gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
+	}
+
+
+	public static class SomePojo {
+		public double doubleField;
+		public int intField;
+		public String stringField;
+	}
+
+}
+
+