You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/04/22 13:03:59 UTC

[2/3] flink git commit: [FLINK-703] [java api] Use complete element as join key

[FLINK-703] [java api] Use complete element as join key


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30a74c76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30a74c76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30a74c76

Branch: refs/heads/master
Commit: 30a74c768b88043c173053f5d37bfb0fde85f149
Parents: e1618e2
Author: Chiwan Park <ch...@icloud.com>
Authored: Mon Apr 6 03:18:23 2015 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Apr 21 19:03:15 2015 +0200

----------------------------------------------------------------------
 .../operators/base/CoGroupOperatorBase.java     | 10 ++-
 .../base/GroupCombineOperatorBase.java          | 42 +++++----
 .../operators/base/GroupReduceOperatorBase.java | 41 +++++----
 .../apache/flink/api/java/operators/Keys.java   | 45 ++++++----
 .../api/java/operator/CoGroupOperatorTest.java  | 94 +++++++++++++++++---
 .../flink/api/java/operator/GroupingTest.java   | 34 ++++++-
 .../api/java/operator/JoinOperatorTest.java     | 72 +++++++++++++++
 .../optimizer/postpass/JavaApiPostPass.java     |  5 --
 .../test/javaApiOperators/CoGroupITCase.java    | 82 ++++++++++++++++-
 .../javaApiOperators/GroupReduceITCase.java     | 38 ++++++--
 .../flink/test/javaApiOperators/JoinITCase.java | 31 +++++++
 11 files changed, 406 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index dbebeb4..7be5650 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.GenericPairComparator;
@@ -273,12 +274,15 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 		return result;
 	}
 
+	@SuppressWarnings("unchecked")
 	private <T> TypeComparator<T> getTypeComparator(ExecutionConfig executionConfig, TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) {
-		if (!(inputType instanceof CompositeType)) {
-			throw new InvalidProgramException("Input types of coGroup must be composite types.");
+		if (inputType instanceof CompositeType) {
+			return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections, 0, executionConfig);
+		} else if (inputType instanceof AtomicType) {
+			return ((AtomicType<T>) inputType).createComparator(inputSortDirections[0], executionConfig);
 		}
 
-		return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections, 0, executionConfig);
+		throw new InvalidProgramException("Input type of coGroup must be one of composite types or atomic types.");
 	}
 
 	private static class CoGroupSortListIterator<IN1, IN2> {

http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
index 27fbc1c..c7ba92b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -77,6 +78,16 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<I
 		return this.groupOrder;
 	}
 
+	private TypeComparator<IN> getTypeComparator(TypeInformation<IN> typeInfo, int[] sortColumns, boolean[] sortOrderings, ExecutionConfig executionConfig) {
+		if (typeInfo instanceof CompositeType) {
+			return ((CompositeType<IN>) typeInfo).createComparator(sortColumns, sortOrderings, 0, executionConfig);
+		} else if (typeInfo instanceof AtomicType) {
+			return ((AtomicType<IN>) typeInfo).createComparator(sortOrderings[0], executionConfig);
+		}
+
+		throw new InvalidProgramException("Input type of GroupCombine must be one of composite types or atomic types.");
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
@@ -87,11 +98,6 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<I
 		TypeInformation<IN> inputType = operatorInfo.getInputType();
 
 		int[] keyColumns = getKeyColumns(0);
-
-		if (!(inputType instanceof CompositeType) && (keyColumns.length > 0 || groupOrder != null)) {
-			throw new InvalidProgramException("Grouping or group-sorting is only possible on composite type.");
-		}
-
 		int[] sortColumns = keyColumns;
 		boolean[] sortOrderings = new boolean[sortColumns.length];
 
@@ -100,19 +106,17 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<I
 			sortOrderings = ArrayUtils.addAll(sortOrderings, groupOrder.getFieldSortDirections());
 		}
 
-		if (inputType instanceof CompositeType) {
-			if(sortColumns.length == 0) { // => all reduce. No comparator
-				Preconditions.checkArgument(sortOrderings.length == 0);
-			} else {
-				final TypeComparator<IN> sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig);
-
-				Collections.sort(inputData, new Comparator<IN>() {
-					@Override
-					public int compare(IN o1, IN o2) {
-						return sortComparator.compare(o1, o2);
-					}
-				});
-			}
+		if(sortColumns.length == 0) { // => all reduce. No comparator
+			Preconditions.checkArgument(sortOrderings.length == 0);
+		} else {
+			final TypeComparator<IN> sortComparator = getTypeComparator(inputType, sortColumns, sortOrderings, executionConfig);
+
+			Collections.sort(inputData, new Comparator<IN>() {
+				@Override
+				public int compare(IN o1, IN o2) {
+					return sortComparator.compare(o1, o2);
+				}
+			});
 		}
 
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
@@ -133,7 +137,7 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<I
 		} else {
 			final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
 			boolean[] keyOrderings = new boolean[keyColumns.length];
-			final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0, executionConfig);
+			final TypeComparator<IN> comparator = getTypeComparator(inputType, keyColumns, keyOrderings, executionConfig);
 
 			ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 57f07f3..3056fe7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -148,6 +149,16 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 	public Partitioner<?> getCustomPartitioner() {
 		return customPartitioner;
 	}
+
+	private TypeComparator<IN> getTypeComparator(TypeInformation<IN> typeInfo, int[] sortColumns, boolean[] sortOrderings, ExecutionConfig executionConfig) {
+		if (typeInfo instanceof CompositeType) {
+			return ((CompositeType<IN>) typeInfo).createComparator(sortColumns, sortOrderings, 0, executionConfig);
+		} else if (typeInfo instanceof AtomicType) {
+			return ((AtomicType<IN>) typeInfo).createComparator(sortOrderings[0], executionConfig);
+		}
+
+		throw new InvalidProgramException("Input type of GroupReduce must be one of composite types or atomic types.");
+	}
 	
 	// --------------------------------------------------------------------------------------------
 
@@ -159,11 +170,6 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 		TypeInformation<IN> inputType = operatorInfo.getInputType();
 
 		int[] keyColumns = getKeyColumns(0);
-
-		if (!(inputType instanceof CompositeType) && (keyColumns.length > 0 || groupOrder != null)) {
-			throw new InvalidProgramException("Grouping or group-sorting is only possible on composite type.");
-		}
-
 		int[] sortColumns = keyColumns;
 		boolean[] sortOrderings = new boolean[sortColumns.length];
 
@@ -172,19 +178,16 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 			sortOrderings = ArrayUtils.addAll(sortOrderings, groupOrder.getFieldSortDirections());
 		}
 
-		if (inputType instanceof CompositeType) {
-			if(sortColumns.length == 0) { // => all reduce. No comparator
-				Preconditions.checkArgument(sortOrderings.length == 0);
-			} else {
-				final TypeComparator<IN> sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig);
-	
-				Collections.sort(inputData, new Comparator<IN>() {
-					@Override
-					public int compare(IN o1, IN o2) {
-						return sortComparator.compare(o1, o2);
-					}
-				});
-			}
+		if(sortColumns.length == 0) { // => all reduce. No comparator
+			Preconditions.checkArgument(sortOrderings.length == 0);
+		} else {
+			final TypeComparator<IN> sortComparator = getTypeComparator(inputType, sortColumns, sortOrderings, executionConfig);
+			Collections.sort(inputData, new Comparator<IN>() {
+				@Override
+				public int compare(IN o1, IN o2) {
+					return sortComparator.compare(o1, o2);
+				}
+			});
 		}
 
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
@@ -205,7 +208,7 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 		} else {
 			final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
 			boolean[] keyOrderings = new boolean[keyColumns.length];
-			final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0, executionConfig);
+			final TypeComparator<IN> comparator = getTypeComparator(inputType, keyColumns, keyOrderings, executionConfig);
 
 			ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index a2cde07..ee233e8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -274,24 +274,33 @@ public abstract class Keys<T> {
 		 * Create ExpressionKeys from String-expressions
 		 */
 		public ExpressionKeys(String[] expressionsIn, TypeInformation<T> type) {
-			if(!(type instanceof CompositeType<?>)) {
-				throw new IllegalArgumentException("Key expressions are only supported on POJO types and Tuples. "
-						+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
-			}
-			CompositeType<T> cType = (CompositeType<T>) type;
-			
-			String[] expressions = removeDuplicates(expressionsIn);
-			if(expressionsIn.length != expressions.length) {
-				LOG.warn("The key expressions contained duplicates. They are now unique");
-			}
-			// extract the keys on their flat position
-			keyFields = new ArrayList<FlatFieldDescriptor>(expressions.length);
-			for (int i = 0; i < expressions.length; i++) {
-				List<FlatFieldDescriptor> keys = cType.getFlatFields(expressions[i]); // use separate list to do a size check
-				if(keys.size() == 0) {
-					throw new IllegalArgumentException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType);
+			Preconditions.checkNotNull(expressionsIn, "Field expression cannot be null.");
+
+			if (type instanceof AtomicType) {
+				if (!type.isKeyType()) {
+					throw new InvalidProgramException("This type (" + type + ") cannot be used as key.");
+				} else if (expressionsIn.length != 1 || !(Keys.ExpressionKeys.SELECT_ALL_CHAR.equals(expressionsIn[0]) || Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA.equals(expressionsIn[0]))) {
+					throw new IllegalArgumentException("Field expression for atomic type must be equal to '*' or '_'.");
+				}
+
+				keyFields = new ArrayList<FlatFieldDescriptor>(1);
+				keyFields.add(new FlatFieldDescriptor(0, type));
+			} else {
+				CompositeType<T> cType = (CompositeType<T>) type;
+
+				String[] expressions = removeDuplicates(expressionsIn);
+				if(expressionsIn.length != expressions.length) {
+					LOG.warn("The key expressions contained duplicates. They are now unique");
+				}
+				// extract the keys on their flat position
+				keyFields = new ArrayList<FlatFieldDescriptor>(expressions.length);
+				for (int i = 0; i < expressions.length; i++) {
+					List<FlatFieldDescriptor> keys = cType.getFlatFields(expressions[i]); // use separate list to do a size check
+					if(keys.size() == 0) {
+						throw new IllegalArgumentException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType);
+					}
+					keyFields.addAll(keys);
 				}
-				keyFields.addAll(keys);
 			}
 		}
 		
@@ -410,7 +419,7 @@ public abstract class Keys<T> {
 			return Arrays.copyOfRange(fields, 0, k+1);
 		}
 	}
-	
+
 	public static class IncompatibleKeysException extends Exception {
 		private static final long serialVersionUID = 1L;
 		public static final String SIZE_MISMATCH_MESSAGE = "The number of specified keys is different.";

http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
index 60754e6..f32f6a9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
@@ -18,26 +18,26 @@
 
 package org.apache.flink.api.java.operator;
 
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operator.JoinOperatorTest.CustomType;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operator.JoinOperatorTest.CustomType;
+
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.junit.Assert.assertTrue;
 
@@ -181,6 +181,78 @@ public class CoGroupOperatorTest {
 		// should not work, cogroup key non-existent
 		ds1.coGroup(ds2).where("myNonExistent").equalTo("myInt");
 	}
+
+	@Test
+	public void testCoGroupKeyAtomicExpression1() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<Integer> ds2 = env.fromElements(0, 0, 1);
+
+		ds1.coGroup(ds2).where("myInt").equalTo("*");
+	}
+
+	@Test
+	public void testCoGroupKeyAtomicExpression2() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds1 = env.fromElements(0, 0, 1);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		ds1.coGroup(ds2).where("*").equalTo("myInt");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testCoGroupKeyAtomicInvalidExpression1() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds1 = env.fromElements(0, 0, 1);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		ds1.coGroup(ds2).where("*", "invalidKey");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testCoGroupKeyAtomicInvalidExpression2() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds1 = env.fromElements(0, 0, 1);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		ds1.coGroup(ds2).where("invalidKey");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testCoGroupKeyAtomicInvalidExpression3() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<Integer> ds2 = env.fromElements(0, 0, 1);
+
+		ds1.coGroup(ds2).where("myInt").equalTo("invalidKey");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testCoGroupKeyAtomicInvalidExpression4() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<Integer> ds2 = env.fromElements(0, 0, 1);
+
+		ds1.coGroup(ds2).where("myInt").equalTo("*", "invalidKey");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testCoGroupKeyAtomicInvalidExpression5() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<ArrayList<Integer>> ds1 = env.fromElements(new ArrayList<Integer>());
+		DataSet<Integer> ds2 = env.fromElements(0, 0, 0);
+
+		ds1.coGroup(ds2).where("*");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testCoGroupKeyAtomicInvalidExpression6() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds1 = env.fromElements(0, 0, 0);
+		DataSet<ArrayList<Integer>> ds2 = env.fromElements(new ArrayList<Integer>());
+
+		ds1.coGroup(ds2).where("*").equalTo("*");
+	}
 	
 	@Test
 	public void testCoGroupKeyExpressions1Nested() {

http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index 314695f..b3922b3 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -143,7 +143,7 @@ public class GroupingTest {
 		}
 	}
 
-	@Test(expected = IllegalArgumentException.class)
+	@Test(expected = InvalidProgramException.class)
 	public void testGroupByKeyExpressions2() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -551,6 +551,38 @@ public class GroupingTest {
 						}, Order.ASCENDING);
 	}
 
+	@Test
+	public void testGroupingAtomicType() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> dataSet = env.fromElements(0, 1, 1, 2, 0, 0);
+
+		dataSet.groupBy("*");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupAtomicTypeWithInvalid1() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> dataSet = env.fromElements(0, 1, 2, 3);
+
+		dataSet.groupBy("*", "invalidField");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupAtomicTypeWithInvalid2() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> dataSet = env.fromElements(0, 1, 2, 3);
+
+		dataSet.groupBy("invalidField");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupAtomicTypeWithInvalid3() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<ArrayList<Integer>> dataSet = env.fromElements(new ArrayList<Integer>());
+
+		dataSet.groupBy("*");
+	}
+
 
 	public static class CustomType implements Serializable {
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index f1aadca..be964cc 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -585,6 +585,78 @@ public class JoinOperatorTest {
 					}
 				);
 	}
+
+	@Test
+	public void testJoinKeyAtomic1() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds1 = env.fromElements(0, 0, 0);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		ds1.join(ds2).where("*").equalTo(0);
+	}
+
+	@Test
+	public void testJoinKeyAtomic2() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Integer> ds2 = env.fromElements(0, 0, 0);
+
+		ds1.join(ds2).where(0).equalTo("*");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testJoinKeyInvalidAtomic1() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds1 = env.fromElements(0, 0, 0);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		ds1.join(ds2).where("*", "invalidKey");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testJoinKeyInvalidAtomic2() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Integer> ds2 = env.fromElements(0, 0, 0);
+
+		ds1.join(ds2).where(0).equalTo("*", "invalidKey");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testJoinKeyInvalidAtomic3() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds1 = env.fromElements(0, 0, 0);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		ds1.join(ds2).where("invalidKey");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testJoinKeyInvalidAtomic4() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Integer> ds2 = env.fromElements(0, 0, 0);
+
+		ds1.join(ds2).where(0).equalTo("invalidKey");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testJoinKeyInvalidAtomic5() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<ArrayList<Integer>> ds1 = env.fromElements(new ArrayList<Integer>());
+		DataSet<Integer> ds2 = env.fromElements(0, 0, 0);
+
+		ds1.join(ds2).where("*").equalTo("*");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testJoinKeyInvalidAtomic6() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds1 = env.fromElements(0, 0, 0);
+		DataSet<ArrayList<Integer>> ds2 = env.fromElements(new ArrayList<Integer>());
+
+		ds1.join(ds2).where("*").equalTo("*");
+	}
 	
 	@Test
 	public void testJoinProjection1() {

http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
index 5fdf3dd..a685ff4 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
@@ -41,7 +41,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
@@ -305,10 +304,6 @@ public class JavaApiPostPass implements OptimizerPostPass {
 	}
 	
 	private static <T1 extends Tuple, T2 extends Tuple> TypePairComparatorFactory<T1,T2> createPairComparator(TypeInformation<?> typeInfo1, TypeInformation<?> typeInfo2) {
-		if (!(typeInfo1.isTupleType() || typeInfo1 instanceof PojoTypeInfo) && (typeInfo2.isTupleType() || typeInfo2 instanceof PojoTypeInfo)) {
-			throw new RuntimeException("The runtime currently supports only keyed binary operations (such as joins) on tuples and POJO types.");
-		}
-		
 //		@SuppressWarnings("unchecked")
 //		TupleTypeInfo<T1> info1 = (TupleTypeInfo<T1>) typeInfo1;
 //		@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index 99f568e..84c05d6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.DataSet;
@@ -47,6 +43,10 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class CoGroupITCase extends MultipleProgramsTestBase {
 
@@ -488,6 +488,36 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 				"-1,30000,Flink\n";
 	}
 
+	@Test
+	public void testCoGroupWithAtomicType1() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Integer> ds2 = env.fromElements(0, 1, 2);
+
+		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*").with(new CoGroupAtomic1());
+
+		coGroupDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "(1,1,Hi)\n" +
+			"(2,2,Hello)";
+	}
+
+	@Test
+	public void testCoGroupWithAtomicType2() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds1 = env.fromElements(0, 1, 2);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0).with(new CoGroupAtomic2());
+
+		coGroupDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "(1,1,Hi)\n" +
+			"(2,2,Hello)";
+	}
+
 	public static class KeySelector1 implements KeySelector<POJO, Long> {
 		private static final long serialVersionUID = 1L;
 
@@ -719,4 +749,48 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 			}
 		}
 	}
+
+	public static class CoGroupAtomic1 implements CoGroupFunction<Tuple3<Integer, Long, String>, Integer, Tuple3<Integer, Long, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(Iterable<Tuple3<Integer, Long, String>> first, Iterable<Integer> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+			List<Integer> ints = new ArrayList<Integer>();
+
+			for (Integer i : second) {
+				ints.add(i);
+			}
+
+			for (Tuple3<Integer, Long, String> t : first) {
+				for (Integer i : ints) {
+					if (t.f0.equals(i)) {
+						out.collect(t);
+					}
+				}
+			}
+		}
+	}
+
+	public static class CoGroupAtomic2 implements CoGroupFunction<Integer, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(Iterable<Integer> first, Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+			List<Integer> ints = new ArrayList<Integer>();
+
+			for (Integer i : first) {
+				ints.add(i);
+			}
+
+			for (Tuple3<Integer, Long, String> t : second) {
+				for (Integer i : ints) {
+					if (t.f0.equals(i)) {
+						out.collect(t);
+					}
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 9eb9a37..cf6b529 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -18,21 +18,20 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.util.Collection;
-import java.util.Iterator;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CrazyNested;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
@@ -49,10 +48,13 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import scala.math.BigInt;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
 @SuppressWarnings("serial")
 @RunWith(Parameterized.class)
 public class GroupReduceITCase extends MultipleProgramsTestBase {
@@ -1063,6 +1065,26 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 
 	}
 
+	@Test
+	public void testGroupReduceWithAtomicValue() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds = env.fromElements(1, 1, 2, 3, 4);
+		DataSet<Integer> reduceDs = ds.groupBy("*").reduceGroup(new GroupReduceFunction<Integer, Integer>() {
+			@Override
+			public void reduce(Iterable<Integer> values, Collector<Integer> out) throws Exception {
+				out.collect(values.iterator().next());
+			}
+		});
+
+		reduceDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "1\n" +
+			"2\n" +
+			"3\n" +
+			"4";
+	}
+
 	public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
 		@Override
 		public void reduce(

http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index 0080fb1..fe436a3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -663,6 +663,37 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				"((3,2,Hello world),(3,2,Hello world)),((3,2,Hello world),(3,2,Hello world))\n";
 	}
 
+	@Test
+	public void testJoinWithAtomicType1() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Integer> ds2 = env.fromElements(1, 2);
+
+		DataSet<Tuple2<Tuple3<Integer, Long, String>, Integer>> joinDs = ds1.join(ds2).where(0).equalTo("*");
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "(1,1,Hi),1\n" +
+			"(2,2,Hello),2";
+	}
+
+	public void testJoinWithAtomicType2() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> ds1 = env.fromElements(1, 2);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+		DataSet<Tuple2<Integer, Tuple3<Integer, Long, String>>> joinDs = ds1.join(ds2).where("*").equalTo(0);
+
+		joinDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,(1,1,Hi)\n" +
+			"2,(2,2,Hello)";
+	}
+
 	public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
 
 		@Override