You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/21 16:58:03 UTC

[1/2] incubator-flink git commit: [APIs] Enhance test coverage for CollectionInputFormat and add tests for failed serializations of user code objects

Repository: incubator-flink
Updated Branches:
  refs/heads/master 02e2857a9 -> cf54a1c2a


[APIs] Enhance test coverage for CollectionInputFormat and add tests for failed serializations of user code objects


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d7853fd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d7853fd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d7853fd3

Branch: refs/heads/master
Commit: d7853fd3157310cae2e66dfc0e9ff146905bdb33
Parents: 02e2857
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 20 15:25:21 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 21 16:56:26 2014 +0100

----------------------------------------------------------------------
 .../flink/util/InstantiationUtilTest.java       |  86 ++++++++
 .../api/java/io/CollectionInputFormat.java      |   7 +-
 .../api/java/io/CollectionInputFormatTest.java  | 202 ++++++++++++++++---
 .../flink/api/java/io/CsvInputFormatTest.java   |   7 +-
 4 files changed, 267 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
index bf4fc8c..aded919 100644
--- a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
@@ -19,17 +19,21 @@
 package org.apache.flink.util;
 
 import org.apache.flink.api.common.typeutils.base.DoubleValueSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class InstantiationUtilTest {
 
@@ -79,6 +83,88 @@ public class InstantiationUtilTest {
 
 		assertEquals("Serialized record is not equal after serialization.", toSerialize, deserialized);
 	}
+	
+	@Test
+	public void testWriteToConfigFailingSerialization() {
+		try {
+			final String key1 = "testkey1";
+			final String key2 = "testkey2";
+			final Configuration config = new Configuration();
+			
+			try {
+				InstantiationUtil.writeObjectToConfig(new TestClassWriteFails(), config, "irgnored");
+				fail("should throw an exception");
+			}
+			catch (TestException e) {
+				// expected
+			}
+			catch (Exception e) {
+				fail("Wrong exception type - exception not properly forwarded");
+			}
+			
+			InstantiationUtil.writeObjectToConfig(new TestClassReadFails(), config, key1);
+			InstantiationUtil.writeObjectToConfig(new TestClassReadFailsCNF(), config, key2);
+			
+			try {
+				InstantiationUtil.readObjectFromConfig(config, key1, getClass().getClassLoader());
+				fail("should throw an exception");
+			}
+			catch (TestException e) {
+				// expected
+			}
+			catch (Exception e) {
+				fail("Wrong exception type - exception not properly forwarded");
+			}
+			
+			try {
+				InstantiationUtil.readObjectFromConfig(config, key2, getClass().getClassLoader());
+				fail("should throw an exception");
+			}
+			catch (ClassNotFoundException e) {
+				// expected
+			}
+			catch (Exception e) {
+				fail("Wrong exception type - exception not properly forwarded");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
+	// --------------------------------------------------------------------------------------------
+	
 	private class TestClass {}
+	
+	private static class TestException extends IOException{
+		private static final long serialVersionUID = 1L;
+	}
+	
+	private static class TestClassWriteFails implements java.io.Serializable {
+		
+		private static final long serialVersionUID = 1L;
+
+		private void writeObject(ObjectOutputStream out) throws IOException {
+			throw new TestException();
+		}
+	}
+	
+	private static class TestClassReadFails implements java.io.Serializable {
+		
+		private static final long serialVersionUID = 1L;
+
+		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+			throw new TestException();
+		}
+	}
+	
+	private static class TestClassReadFailsCNF implements java.io.Serializable {
+		
+		private static final long serialVersionUID = 1L;
+
+		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+			throw new ClassNotFoundException("test exception");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index 89adf96..b999ede 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -80,8 +80,9 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		out.defaultWriteObject();
 		out.writeInt(dataSet.size());
 		
+		OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
 		for (T element : dataSet){
-			serializer.serialize(element, new OutputViewObjectOutputStreamWrapper(out));
+			serializer.serialize(element, wrapper);
 		}
 	}
 
@@ -91,10 +92,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		int collectionLength = in.readInt();
 		List<T> list = new ArrayList<T>(collectionLength);
 		
-
+		InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
 		for (int i = 0; i < collectionLength; i++){
 			T element = serializer.createInstance();
-			element = serializer.deserialize(element, new InputViewObjectInputStreamWrapper(in));
+			element = serializer.deserialize(element, wrapper);
 			list.add(element);
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 11a018c..948d22f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
 import static org.junit.Assert.assertEquals;
@@ -26,8 +25,11 @@ import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -38,11 +40,13 @@ import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 public class CollectionInputFormatTest {
-	public static class ElementType{
-		private int id;
+	
+	public static class ElementType {
+		private final int id;
 
 		public ElementType(){
 			this(-1);
@@ -52,37 +56,43 @@ public class CollectionInputFormatTest {
 			this.id = id;
 		}
 
-		public int getId(){return id;}
+		public int getId() {
+			return id;
+		}
 
 		@Override
-		public boolean equals(Object obj){
-			if(obj != null && obj instanceof ElementType){
+		public boolean equals(Object obj) {
+			if (obj != null && obj instanceof ElementType) {
 				ElementType et = (ElementType) obj;
-
 				return et.getId() == this.getId();
-			}else {
+			} else {
 				return false;
 			}
 		}
+		
+		@Override
+		public int hashCode() {
+			return id;
+		}
 	}
 
 	@Test
-	public void testSerializability(){
-		Collection<ElementType> inputCollection = new ArrayList<ElementType>();
-		ElementType element1 = new ElementType(1);
-		ElementType element2 = new ElementType(2);
-		ElementType element3 = new ElementType(3);
-		inputCollection.add(element1);
-		inputCollection.add(element2);
-		inputCollection.add(element3);
-
-		@SuppressWarnings("unchecked")
-		TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
-
-		CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
-				info.createSerializer());
-
-		try{
+	public void testSerializability() {
+		try {
+			Collection<ElementType> inputCollection = new ArrayList<ElementType>();
+			ElementType element1 = new ElementType(1);
+			ElementType element2 = new ElementType(2);
+			ElementType element3 = new ElementType(3);
+			inputCollection.add(element1);
+			inputCollection.add(element2);
+			inputCollection.add(element3);
+	
+			@SuppressWarnings("unchecked")
+			TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
+	
+			CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
+					info.createSerializer());
+
 			ByteArrayOutputStream buffer = new ByteArrayOutputStream();
 			ObjectOutputStream out = new ObjectOutputStream(buffer);
 
@@ -108,10 +118,10 @@ public class CollectionInputFormatTest {
 
 				assertEquals(expectedElement, actualElement);
 			}
-		}catch(IOException ex){
-			fail(ex.toString());
-		}catch(ClassNotFoundException ex){
-			fail(ex.toString());
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.toString());
 		}
 	}
 	
@@ -157,7 +167,6 @@ public class CollectionInputFormatTest {
 		};
 		
 		try {
-			
 			List<String> inputCollection = Arrays.asList(data);
 			CollectionInputFormat<String> inputFormat = new CollectionInputFormat<String>(inputCollection, BasicTypeInfo.STRING_TYPE_INFO.createSerializer());
 			
@@ -190,4 +199,139 @@ public class CollectionInputFormatTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testSerializationFailure() {
+		try {
+			// a mock serializer that fails when writing
+			CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
+					Collections.singleton(new ElementType()), new TestSerializer(false, true));
+			
+			ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+			ObjectOutputStream out = new ObjectOutputStream(buffer);
+			
+			try {
+				out.writeObject(inFormat);
+				fail("should throw an exception");
+			}
+			catch (TestException e) {
+				// expected
+			}
+			catch (Exception e) {
+				fail("Exception not properly forwarded");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDeserializationFailure() {
+		try {
+			// a mock serializer that fails when writing
+			CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
+					Collections.singleton(new ElementType()), new TestSerializer(true, false));
+			
+			ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+			ObjectOutputStream out = new ObjectOutputStream(buffer);
+			out.writeObject(inFormat);
+			out.close();
+			
+			ByteArrayInputStream bais = new ByteArrayInputStream(buffer.toByteArray());
+			ObjectInputStream in = new ObjectInputStream(bais);
+			
+			try {
+				in.readObject();
+				fail("should throw an exception");
+			}
+			catch (TestException e) {
+				// expected
+			}
+			catch (Exception e) {
+				fail("Exception not properly forwarded");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static class TestException extends IOException{
+		private static final long serialVersionUID = 1L;
+	}
+	
+	private static class TestSerializer extends TypeSerializer<ElementType> {
+
+		private static final long serialVersionUID = 1L;
+		
+		private boolean failOnRead;
+		private boolean failOnWrite;
+		
+		public TestSerializer(boolean failOnRead, boolean failOnWrite) {
+			this.failOnRead = failOnRead;
+			this.failOnWrite = failOnWrite;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public boolean isStateful() {
+			return false;
+		}
+
+		@Override
+		public ElementType createInstance() {
+			return new ElementType();
+		}
+
+		@Override
+		public ElementType copy(ElementType from) {
+			return from;
+		}
+
+		@Override
+		public ElementType copy(ElementType from, ElementType reuse) {
+			return from;
+		}
+
+		@Override
+		public int getLength() {
+			return 4;
+		}
+
+		@Override
+		public void serialize(ElementType record, DataOutputView target) throws IOException {
+			if (failOnWrite) {
+				throw new TestException();
+			}
+			target.writeInt(record.getId());
+		}
+
+		@Override
+		public ElementType deserialize(DataInputView source) throws IOException {
+			if (failOnRead) {
+				throw new TestException();
+			}
+			return new ElementType(source.readInt());
+		}
+
+		@Override
+		public ElementType deserialize(ElementType reuse, DataInputView source) throws IOException {
+			if (failOnRead) {
+				throw new TestException();
+			}
+			return new ElementType(source.readInt());
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			target.writeInt(source.readInt());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 0662aa6..5f10a2b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -20,6 +20,7 @@
 package org.apache.flink.api.java.io;
 
 import com.google.common.base.Charsets;
+
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -399,7 +400,8 @@ public class CsvInputFormatTest {
 
 		Tuple5<Integer, String, String, String, Double> result = new Tuple5<Integer, String, String, String, Double>();
 
-		Tuple5[] expectedLines = new Tuple5[]{
+		@SuppressWarnings("unchecked")
+		Tuple5<Integer, String, String, String, Double>[] expectedLines = new Tuple5[] {
 				new Tuple5<Integer, String, String, String, Double>(1997, "Ford", "E350", "ac, abs, moon", 3000.0),
 				new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition\"", "", 4900.0),
 				new Tuple5<Integer, String, String, String, Double>(1996, "Jeep", "Grand Cherokee", "MUST SELL! air, moon roof, loaded", 4799.00),
@@ -408,8 +410,7 @@ public class CsvInputFormatTest {
 		};
 
 		try {
-
-			for (Tuple5 expected : expectedLines) {
+			for (Tuple5<Integer, String, String, String, Double> expected : expectedLines) {
 				result = format.nextRecord(result);
 				assertEquals(expected, result);
 			}


[2/2] incubator-flink git commit: [FLINK-1263] [optimizer] Implement compatibility checks for binary operators and custom partitioning

Posted by se...@apache.org.
[FLINK-1263] [optimizer] Implement compatibility checks for binary operators and custom partitioning

This closes #223


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cf54a1c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cf54a1c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cf54a1c2

Branch: refs/heads/master
Commit: cf54a1c2af1e228f83609903ba288476c5f05fee
Parents: d7853fd
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 20 16:09:40 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 21 16:56:35 2014 +0100

----------------------------------------------------------------------
 .../apache/flink/compiler/dag/TwoInputNode.java |  23 ++-
 .../compiler/dag/WorksetIterationNode.java      |   7 +-
 .../operators/AbstractJoinDescriptor.java       |  19 ++-
 .../operators/BinaryUnionOpDescriptor.java      |   7 +
 .../operators/CartesianProductDescriptor.java   |   6 +
 .../compiler/operators/CoGroupDescriptor.java   |  14 +-
 .../CrossBlockOuterFirstDescriptor.java         |   5 +-
 .../CrossBlockOuterSecondDescriptor.java        |   5 +-
 .../CrossStreamOuterFirstDescriptor.java        |   5 +-
 .../CrossStreamOuterSecondDescriptor.java       |   5 +-
 .../operators/OperatorDescriptorDual.java       |   3 +
 .../operators/UtilSinkJoinOpDescriptor.java     |   9 +-
 ...naryCustomPartitioningCompatibilityTest.java |  84 ++++++++++
 .../compiler/java/PartitioningOperatorTest.java |  70 ++++++++
 .../operators/CoGroupCompatibilityTest.java     | 161 +++++++++++++++++++
 .../JoinGlobalPropertiesCompatibilityTest.java  | 161 +++++++++++++++++++
 .../JoinOnConflictingPartitioningsTest.java     |  65 ++++++++
 .../common/operators/SingleInputOperator.java   |  12 +-
 .../SingleInputSemanticProperties.java          |  59 ++++++-
 .../operators/base/PartitionOperatorBase.java   |   7 +-
 .../PartitionOperatorTranslationTest.scala      |  60 +++++++
 21 files changed, 743 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
index b329a6e..32f0519 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
@@ -418,18 +418,25 @@ public abstract class TwoInputNode extends OptimizerNode {
 						 *       the pairs of global properties.
 						 * *******************************************************************/
 						
+						outer:
 						for (GlobalPropertiesPair gpp : allGlobalPairs) {
 							if (gpp.getProperties1().isMetBy(c1.getGlobalProperties()) && 
 								gpp.getProperties2().isMetBy(c2.getGlobalProperties()) )
 							{
-								Channel c1Clone = c1.clone();
-								c1Clone.setRequiredGlobalProps(gpp.getProperties1());
-								c2.setRequiredGlobalProps(gpp.getProperties2());
-								
-								// we form a valid combination, so create the local candidates
-								// for this
-								addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
-								break;
+								for (OperatorDescriptorDual desc : getProperties()) {
+									if (desc.areCompatible(gpp.getProperties1(), gpp.getProperties2(), 
+											c1.getGlobalProperties(), c2.getGlobalProperties()))
+									{
+										Channel c1Clone = c1.clone();
+										c1Clone.setRequiredGlobalProps(gpp.getProperties1());
+										c2.setRequiredGlobalProps(gpp.getProperties2());
+										
+										// we form a valid combination, so create the local candidates
+										// for this
+										addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
+										break outer;
+									}
+								}
 							}
 						}
 						

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
index 0dd23bf..ae6bc6b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
@@ -514,6 +514,12 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		}
 		
 		@Override
+		public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+				GlobalProperties produced1, GlobalProperties produced2) {
+			return true;
+		}
+		
+		@Override
 		public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
 				LocalProperties produced1, LocalProperties produced2) {
 			return true;
@@ -562,6 +568,5 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
 			// no estimates are needed here
 		}
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
index 84af77c..cb0e61c 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
@@ -64,14 +64,14 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 			// partition both (hash or custom)
 			RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties();
 			if (customPartitioner == null) {
-				partitioned1.setHashPartitioned(this.keys1);
+				partitioned1.setAnyPartitioning(this.keys1);
 			} else {
 				partitioned1.setCustomPartitioned(this.keys1, this.customPartitioner);
 			}
 			
 			RequestedGlobalProperties partitioned2 = new RequestedGlobalProperties();
 			if (customPartitioner == null) {
-				partitioned2.setHashPartitioned(this.keys2);
+				partitioned2.setAnyPartitioning(this.keys2);
 			} else {
 				partitioned2.setCustomPartitioned(this.keys2, this.customPartitioner);
 			}
@@ -96,6 +96,21 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 		}
 		return pairs;
 	}
+	
+	@Override
+	public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+			GlobalProperties produced1, GlobalProperties produced2)
+	{
+		if (requested1.getPartitioning().isPartitionedOnKey() && requested2.getPartitioning().isPartitionedOnKey()) {
+			return produced1.getPartitioning() == produced2.getPartitioning() && 
+					(produced1.getCustomPartitioner() == null ? 
+						produced2.getCustomPartitioner() == null :
+						produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()));
+		} else {
+			return true;
+		}
+
+	}
 
 	@Override
 	public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/BinaryUnionOpDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/BinaryUnionOpDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/BinaryUnionOpDescriptor.java
index a78f8e7..c393fc2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/BinaryUnionOpDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/BinaryUnionOpDescriptor.java
@@ -27,6 +27,7 @@ import org.apache.flink.compiler.dag.TwoInputNode;
 import org.apache.flink.compiler.dataproperties.GlobalProperties;
 import org.apache.flink.compiler.dataproperties.LocalProperties;
 import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
 import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
 import org.apache.flink.compiler.plan.BinaryUnionPlanNode;
 import org.apache.flink.compiler.plan.Channel;
@@ -87,4 +88,10 @@ public class BinaryUnionOpDescriptor extends OperatorDescriptorDual {
 			LocalProperties produced1, LocalProperties produced2) {
 		return true;
 	}
+	
+	@Override
+	public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+			GlobalProperties produced1, GlobalProperties produced2) {
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
index 5107486..fefd71a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
@@ -78,6 +78,12 @@ public abstract class CartesianProductDescriptor extends OperatorDescriptorDual
 		return Collections.singletonList(new LocalPropertiesPair(
 			new RequestedLocalProperties(), new RequestedLocalProperties()));
 	}
+	
+	@Override
+	public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+			GlobalProperties produced1, GlobalProperties produced2) {
+		return true;
+	}
 
 	@Override
 	public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
index 55db89d..90e4c3b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.operators;
 
 import java.util.Collections;
@@ -37,9 +36,6 @@ import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.util.Utils;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
-/**
- * 
- */
 public class CoGroupDescriptor extends OperatorDescriptorDual {
 	
 	private final Ordering ordering1;		// ordering on the first input 
@@ -110,6 +106,16 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 	}
 	
 	@Override
+	public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+			GlobalProperties produced1, GlobalProperties produced2)
+	{
+		return produced1.getPartitioning() == produced2.getPartitioning() && 
+				(produced1.getCustomPartitioner() == null ? 
+					produced2.getCustomPartitioner() == null :
+					produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()));
+	}
+	
+	@Override
 	public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
 			LocalProperties produced1, LocalProperties produced2)
 	{

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterFirstDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterFirstDescriptor.java
index be14d80..1ec1905 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterFirstDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterFirstDescriptor.java
@@ -16,15 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.operators;
 
 import org.apache.flink.compiler.dataproperties.LocalProperties;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
-/**
- * 
- */
+
 public class CrossBlockOuterFirstDescriptor extends CartesianProductDescriptor {
 	
 	public CrossBlockOuterFirstDescriptor() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterSecondDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterSecondDescriptor.java
index 4fcfef1..a10f1a4 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterSecondDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterSecondDescriptor.java
@@ -16,15 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.operators;
 
 import org.apache.flink.compiler.dataproperties.LocalProperties;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
-/**
- * 
- */
+
 public class CrossBlockOuterSecondDescriptor extends CartesianProductDescriptor {
 	
 	public CrossBlockOuterSecondDescriptor() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterFirstDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterFirstDescriptor.java
index f80f404..a0512d4 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterFirstDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterFirstDescriptor.java
@@ -16,15 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.operators;
 
 import org.apache.flink.compiler.dataproperties.LocalProperties;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
-/**
- * 
- */
+
 public class CrossStreamOuterFirstDescriptor extends CartesianProductDescriptor {
 	
 	public CrossStreamOuterFirstDescriptor() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterSecondDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterSecondDescriptor.java
index 6975760..9b6b122 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterSecondDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterSecondDescriptor.java
@@ -16,15 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.operators;
 
 import org.apache.flink.compiler.dataproperties.LocalProperties;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
-/**
- * 
- */
+
 public class CrossStreamOuterSecondDescriptor extends CartesianProductDescriptor {
 	
 	public CrossStreamOuterSecondDescriptor() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
index 11224ec..8eca16e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
@@ -70,6 +70,9 @@ public abstract class OperatorDescriptorDual implements AbstractOperatorDescript
 	
 	protected abstract List<LocalPropertiesPair> createPossibleLocalProperties();
 	
+	public abstract boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+			GlobalProperties produced1, GlobalProperties produced2);
+	
 	public abstract boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
 			LocalProperties produced1, LocalProperties produced2);
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/UtilSinkJoinOpDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/UtilSinkJoinOpDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/UtilSinkJoinOpDescriptor.java
index 7ccdda0..a17096f 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/UtilSinkJoinOpDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/UtilSinkJoinOpDescriptor.java
@@ -59,9 +59,14 @@ public class UtilSinkJoinOpDescriptor extends OperatorDescriptorDual {
 	}
 	
 	@Override
+	public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+			GlobalProperties produced1, GlobalProperties produced2) {
+		return true;
+	}
+	
+	@Override
 	public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
-			LocalProperties produced1, LocalProperties produced2)
-	{
+			LocalProperties produced1, LocalProperties produced2) {
 		return true;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java
new file mode 100644
index 0000000..ff3d78e
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.custompartition;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings({"serial","unchecked"})
+public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase {
+
+	@Test
+	public void testCompatiblePartitioning() {
+		try {
+			final Partitioner<Long> partitioner = new Partitioner<Long>() {
+				@Override
+				public int partition(Long key, int numPartitions) {
+					return 0;
+				}
+			};
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			input1.partitionCustom(partitioner, 1)
+				.join(input2.partitionCustom(partitioner, 0))
+				.where(1).equalTo(0)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode partitioner1 = (SingleInputPlanNode) join.getInput1().getSource();
+			SingleInputPlanNode partitioner2 = (SingleInputPlanNode) join.getInput2().getSource();
+
+			assertEquals(ShipStrategyType.FORWARD, join.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, join.getInput2().getShipStrategy());
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner1.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner2.getInput().getShipStrategy());
+			assertEquals(partitioner, partitioner1.getInput().getPartitioner());
+			assertEquals(partitioner, partitioner2.getInput().getPartitioner());
+			
+			new NepheleJobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/test/java/org/apache/flink/compiler/java/PartitioningOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/PartitioningOperatorTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/java/PartitioningOperatorTest.java
new file mode 100644
index 0000000..26185c7
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/PartitioningOperatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.java;
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PartitioningOperatorTest extends CompilerTestBase {
+
+	@Test
+	public void testPartitiongOperatorPreservesFields() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<Long, Long>(0L, 0L)));
+			
+			data.partitionCustom(new Partitioner<Long>() {
+					public int partition(Long key, int numPartitions) { return key.intValue(); }
+				}, 1)
+				.groupBy(1)
+				.reduceGroup(new IdentityGroupReducer<Tuple2<Long,Long>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode partitioner = (SingleInputPlanNode) reducer.getInput().getSource();
+
+			assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java
new file mode 100644
index 0000000..ae65094
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CoGroupCompatibilityTest {
+
+	@Test
+	public void checkCompatiblePartitionings() {
+		try {
+			final FieldList keysLeft = new FieldList(1, 4);
+			final FieldList keysRight = new FieldList(3, 1);
+			
+			CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
+			
+			// test compatible hash partitioning
+			{
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setHashPartitioned(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setHashPartitioned(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setHashPartitioned(keysLeft);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setHashPartitioned(keysRight);
+				
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+			
+			// test compatible custom partitioning
+			{
+				Partitioner<Object> part = new Partitioner<Object>() {
+					@Override
+					public int partition(Object key, int numPartitions) {
+						return 0;
+					}
+				};
+				
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setCustomPartitioned(keysLeft, part);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setCustomPartitioned(keysRight, part);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setCustomPartitioned(keysLeft, part);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part);
+				
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+			
+			// test custom partitioning matching any partitioning
+			{
+				Partitioner<Object> part = new Partitioner<Object>() {
+					@Override
+					public int partition(Object key, int numPartitions) {
+						return 0;
+					}
+				};
+				
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setAnyPartitioning(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setAnyPartitioning(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setCustomPartitioned(keysLeft, part);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part);
+				
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void checkInompatiblePartitionings() {
+		try {
+			final FieldList keysLeft = new FieldList(1);
+			final FieldList keysRight = new FieldList(3);
+			
+			final Partitioner<Object> part = new Partitioner<Object>() {
+				@Override
+				public int partition(Object key, int numPartitions) {
+					return 0;
+				}
+			};
+			final Partitioner<Object> part2 = new Partitioner<Object>() {
+				@Override
+				public int partition(Object key, int numPartitions) {
+					return 0;
+				}
+			};
+			
+			CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
+			
+			// test incompatible hash with custom partitioning
+			{
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setAnyPartitioning(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setAnyPartitioning(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setHashPartitioned(keysLeft);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part);
+				
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+			
+			// test incompatible custom partitionings
+			{
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setAnyPartitioning(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setAnyPartitioning(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setCustomPartitioned(keysLeft, part);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part2);
+				
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinGlobalPropertiesCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinGlobalPropertiesCompatibilityTest.java
new file mode 100644
index 0000000..13f4102
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinGlobalPropertiesCompatibilityTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class JoinGlobalPropertiesCompatibilityTest {
+
+	@Test
+	public void checkCompatiblePartitionings() {
+		try {
+			final FieldList keysLeft = new FieldList(1, 4);
+			final FieldList keysRight = new FieldList(3, 1);
+			
+			SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
+			
+			// test compatible hash partitioning
+			{
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setHashPartitioned(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setHashPartitioned(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setHashPartitioned(keysLeft);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setHashPartitioned(keysRight);
+				
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+			
+			// test compatible custom partitioning
+			{
+				Partitioner<Object> part = new Partitioner<Object>() {
+					@Override
+					public int partition(Object key, int numPartitions) {
+						return 0;
+					}
+				};
+				
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setCustomPartitioned(keysLeft, part);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setCustomPartitioned(keysRight, part);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setCustomPartitioned(keysLeft, part);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part);
+				
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+			
+			// test custom partitioning matching any partitioning
+			{
+				Partitioner<Object> part = new Partitioner<Object>() {
+					@Override
+					public int partition(Object key, int numPartitions) {
+						return 0;
+					}
+				};
+				
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setAnyPartitioning(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setAnyPartitioning(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setCustomPartitioned(keysLeft, part);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part);
+				
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void checkInompatiblePartitionings() {
+		try {
+			final FieldList keysLeft = new FieldList(1);
+			final FieldList keysRight = new FieldList(3);
+			
+			final Partitioner<Object> part = new Partitioner<Object>() {
+				@Override
+				public int partition(Object key, int numPartitions) {
+					return 0;
+				}
+			};
+			final Partitioner<Object> part2 = new Partitioner<Object>() {
+				@Override
+				public int partition(Object key, int numPartitions) {
+					return 0;
+				}
+			};
+			
+			SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
+			
+			// test incompatible hash with custom partitioning
+			{
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setAnyPartitioning(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setAnyPartitioning(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setHashPartitioned(keysLeft);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part);
+				
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+			
+			// test incompatible custom partitionings
+			{
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setAnyPartitioning(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setAnyPartitioning(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setCustomPartitioned(keysLeft, part);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part2);
+				
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinOnConflictingPartitioningsTest.java
new file mode 100644
index 0000000..c8713e6
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinOnConflictingPartitioningsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerException;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.PactCompiler;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class JoinOnConflictingPartitioningsTest extends CompilerTestBase {
+
+	@Test
+	public void testRejectJoinOnHashAndRangePartitioning() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			
+			Configuration cfg = new Configuration();
+			cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+			cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
+			
+			input.join(input).where(0).equalTo(0)
+				.withParameters(cfg)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			try {
+				compileNoStats(p);
+				fail("This should fail with an exception");
+			}
+			catch (CompilerException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
index f1bf2ad..5b491bf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
@@ -35,19 +35,13 @@ import org.apache.flink.util.Visitor;
  */
 public abstract class SingleInputOperator<IN, OUT, FT extends Function> extends AbstractUdfOperator<OUT, FT> {
 	
-	/**
-	 * The input which produces the data consumed by this operator.
-	 */
+	/** The input which produces the data consumed by this operator. */
 	protected Operator<IN> input;
 	
-	/**
-	 * The positions of the keys in the tuple.
-	 */
+	/** The positions of the keys in the tuple. */
 	private final int[] keyFields;
 	
-	/**
-	 * Semantic properties of the associated function.
-	 */
+	/** Semantic properties of the associated function. */
 	private SingleInputSemanticProperties semanticProperties;
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
index 2de53eb..45f020a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators;
 
 import java.util.HashMap;
@@ -149,5 +148,63 @@ public class SingleInputSemanticProperties extends SemanticProperties {
 		this.forwardedFields = new HashMap<Integer,FieldSet>();
 		this.readFields = null;
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static class AllFieldsConstantProperties extends SingleInputSemanticProperties {
+		
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public FieldSet getReadFields() {
+			return FieldSet.EMPTY_SET;
+		}
+		
+		@Override
+		public FieldSet getWrittenFields() {
+			return FieldSet.EMPTY_SET;
+		}
+
+		@Override
+		public FieldSet getForwardedField(int sourceField) {
+			return new FieldSet(sourceField);
+		}
+		
+		// ----- all mutating operations are unsupported -----
+		
+		@Override
+		public void addForwardedField(int sourceField, FieldSet destinationFields) {
+			throw new UnsupportedOperationException();
+		}
 		
+		@Override
+		public void addForwardedField(int sourceField, int destinationField) {
+			throw new UnsupportedOperationException();
+		}
+		
+		@Override
+		public void setForwardedField(int sourceField, FieldSet destinationFields) {
+			throw new UnsupportedOperationException();
+		}
+		
+		@Override
+		public void addReadFields(FieldSet readFields) {
+			throw new UnsupportedOperationException();
+		}
+		
+		@Override
+		public void setReadFields(FieldSet readFields) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void addWrittenFields(FieldSet writtenFields) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void setWrittenFields(FieldSet writtenFields) {
+			throw new UnsupportedOperationException();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
index ee3b259..6735298 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.NoOpFunction;
 import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 
 /**
- *
  * @param <IN> The input and result type.
  */
 public class PartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpFunction> {
@@ -79,6 +79,11 @@ public class PartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpF
 		}
 		this.customPartitioner = customPartitioner;
 	}
+	
+	@Override
+	public SingleInputSemanticProperties getSemanticProperties() {
+		return new SingleInputSemanticProperties.AllFieldsConstantProperties();
+	}
 
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
new file mode 100644
index 0000000..1e44413
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.compiler
+
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.flink.api.scala._
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType
+import org.apache.flink.test.compiler.util.CompilerTestBase
+import org.apache.flink.compiler.plan.SingleInputPlanNode
+import org.apache.flink.api.common.functions.Partitioner
+
+class PartitionOperatorTranslationTest extends CompilerTestBase {
+
+  @Test
+  def testPartitiongOperatorPreservesFields() {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      
+      val data = env.fromElements( (0L, 0L) )
+      data.partitionCustom(new Partitioner[Long]() {
+          def partition(key: Long, numPartitions: Int): Int = key.intValue()
+        }, 1)
+        .groupBy(1).reduceGroup( x => x)
+        .print()
+      
+      val p = env.createProgramPlan()
+      val op = compileNoStats(p)
+      
+      val sink = op.getDataSinks.iterator().next()
+      val reducer = sink.getInput.getSource.asInstanceOf[SingleInputPlanNode]
+      val partitioner = reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode]
+
+      assertEquals(ShipStrategyType.FORWARD, reducer.getInput.getShipStrategy)
+      assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput.getShipStrategy)
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+}