You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:10 UTC

[10/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
index 6215f31..638eb5c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.types.IntValue;
+
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,13 +42,13 @@ import java.util.Random;
 
 import static org.hamcrest.Matchers.is;
 
-/*
+/**
  * These programs demonstrate the effects of user defined functions which modify input objects or return locally created
  * objects that are retained and reused on future calls. The programs do not retain and later modify input objects.
  */
 public class OverwriteObjects {
 
-	public final static Logger LOG = LoggerFactory.getLogger(OverwriteObjects.class);
+	public static final Logger LOG = LoggerFactory.getLogger(OverwriteObjects.class);
 
 	// DataSets are created with this number of elements
 	private static final int NUMBER_OF_ELEMENTS = 3_000_000;
@@ -71,7 +72,7 @@ public class OverwriteObjects {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 
-		for (int parallelism = MAX_PARALLELISM ; parallelism > 0 ; parallelism--) {
+		for (int parallelism = MAX_PARALLELISM; parallelism > 0; parallelism--) {
 			LOG.info("Parallelism = {}", parallelism);
 
 			env.setParallelism(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java b/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
index c8604cb..46be968 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
@@ -36,7 +36,7 @@ import java.util.Random;
  * (See also http://peel-framework.org/2016/04/07/hash-aggregations-in-flink.html)
  */
 public class ReducePerformance {
-	
+
 	public static void main(String[] args) throws Exception {
 
 		final int numElements = 40_000_000;
@@ -120,7 +120,7 @@ public class ReducePerformance {
 			int rem = numElements % numPartitions;
 			SplittableRandomIterator<T, B>[] res = new SplittableRandomIterator[numPartitions];
 			for (int i = 0; i < numPartitions; i++) {
-				res[i] = new SplittableRandomIterator<T, B>(i < rem ? splitSize : splitSize + 1, (B)baseIterator.copy());
+				res[i] = new SplittableRandomIterator<T, B>(i < rem ? splitSize : splitSize + 1, (B) baseIterator.copy());
 			}
 			return res;
 		}
@@ -140,7 +140,6 @@ public class ReducePerformance {
 		CopyableIterator<T> copy();
 	}
 
-
 	private static final class TupleIntIntIterator implements CopyableIterator<Tuple2<Integer, Integer>>, Serializable {
 
 		private final int keyRange;
@@ -183,7 +182,6 @@ public class ReducePerformance {
 		}
 	}
 
-
 	private static final class TupleStringIntIterator implements CopyableIterator<Tuple2<String, Integer>>, Serializable {
 
 		private final int keyRange;
@@ -226,7 +224,6 @@ public class ReducePerformance {
 		}
 	}
 
-
 	private static final class SumReducer<K> implements ReduceFunction<Tuple2<K, Integer>> {
 		@Override
 		public Tuple2<K, Integer> reduce(Tuple2<K, Integer> a, Tuple2<K, Integer> b) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 90dbe80..c7f43fa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -31,24 +31,27 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 
 import static org.junit.Assert.fail;
 
+/**
+ * Manual test to evaluate impact of checkpointing on latency.
+ */
 public class StreamingScalabilityAndLatency {
-	
+
 	public static void main(String[] args) throws Exception {
 		if ((Runtime.getRuntime().maxMemory() >>> 20) < 5000) {
 			throw new RuntimeException("This test program needs to run with at least 5GB of heap space.");
 		}
-		
-		final int TASK_MANAGERS = 1;
-		final int SLOTS_PER_TASK_MANAGER = 80;
-		final int PARALLELISM = TASK_MANAGERS * SLOTS_PER_TASK_MANAGER;
+
+		final int taskManagers = 1;
+		final int slotsPerTaskManager = 80;
+		final int parallelism = taskManagers * slotsPerTaskManager;
 
 		LocalFlinkMiniCluster cluster = null;
 
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, TASK_MANAGERS);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, taskManagers);
 			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager);
 			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000);
 
 			config.setInteger("taskmanager.net.server.numThreads", 1);
@@ -56,8 +59,8 @@ public class StreamingScalabilityAndLatency {
 
 			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
-			
-			runPartitioningProgram(cluster.getLeaderRPCPort(), PARALLELISM);
+
+			runPartitioningProgram(cluster.getLeaderRPCPort(), parallelism);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -69,7 +72,7 @@ public class StreamingScalabilityAndLatency {
 			}
 		}
 	}
-	
+
 	private static void runPartitioningProgram(int jobManagerPort, int parallelism) throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
 		env.setParallelism(parallelism);
@@ -83,23 +86,22 @@ public class StreamingScalabilityAndLatency {
 			.map(new IdMapper<Tuple2<Long, Long>>())
 			.keyBy(0)
 			.addSink(new TimestampingSink());
-		
+
 		env.execute("Partitioning Program");
 	}
-	
-	public static class TimeStampingSource implements ParallelSourceFunction<Tuple2<Long, Long>> {
+
+	private static class TimeStampingSource implements ParallelSourceFunction<Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = -151782334777482511L;
 
 		private volatile boolean running = true;
-		
-		
+
 		@Override
 		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-			
+
 			long num = 100;
 			long counter = (long) (Math.random() * 4096);
-			
+
 			while (running) {
 				if (num < 100) {
 					num++;
@@ -119,14 +121,14 @@ public class StreamingScalabilityAndLatency {
 			running = false;
 		}
 	}
-	
-	public static class TimestampingSink implements SinkFunction<Tuple2<Long, Long>> {
+
+	private static class TimestampingSink implements SinkFunction<Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1876986644706201196L;
 
 		private long maxLatency;
-		private long count; 
-		
+		private long count;
+
 		@Override
 		public void invoke(Tuple2<Long, Long> value) {
 			long ts = value.f1;
@@ -134,7 +136,7 @@ public class StreamingScalabilityAndLatency {
 				long diff = System.currentTimeMillis() - ts;
 				maxLatency = Math.max(diff, maxLatency);
 			}
-			
+
 			count++;
 			if (count == 5000) {
 				System.out.println("Max latency: " + maxLatency);
@@ -144,7 +146,7 @@ public class StreamingScalabilityAndLatency {
 		}
 	}
 
-	public static class IdMapper<T> implements MapFunction<T, T> {
+	private static class IdMapper<T> implements MapFunction<T, T> {
 
 		private static final long serialVersionUID = -6543809409233225099L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
index 1c5744d..bd5123a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
@@ -21,4 +21,5 @@
  * need to be manually invoked, because they are extremely heavy, time intensive,
  * of require larger-than-usual JVMs.
  */
-package org.apache.flink.test.manual;
\ No newline at end of file
+
+package org.apache.flink.test.manual;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index eea2509..1fb5e65 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -18,13 +18,10 @@
 
 package org.apache.flink.test.misc;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -33,6 +30,7 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -41,6 +39,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 /**
  * This test verifies that the auto parallelism is properly forwarded to the runtime.
  */
@@ -79,7 +80,6 @@ public class AutoParallelismITCase extends TestLogger {
 		}
 	}
 
-
 	@Test
 	public void testProgramWithAutoParallelism() {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
index 39a08d2..b8f1d80 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
@@ -24,8 +24,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.test.util.JavaProgramTestBase;
+
 import org.junit.Assert;
 
+/**
+ * Integration tests for custom {@link Partitioner}.
+ */
 @SuppressWarnings("serial")
 public class CustomPartitioningITCase extends JavaProgramTestBase {
 
@@ -36,17 +40,17 @@ public class CustomPartitioningITCase extends JavaProgramTestBase {
 		if (!isCollectionExecution()) {
 			Assert.assertTrue(env.getParallelism() > 1);
 		}
-		
+
 		env.generateSequence(1, 1000)
 			.partitionCustom(new AllZeroPartitioner(), new IdKeySelector<Long>())
 			.map(new FailExceptInPartitionZeroMapper())
 			.output(new DiscardingOutputFormat<Long>());
-		
+
 		env.execute();
 	}
-	
-	public static class FailExceptInPartitionZeroMapper extends RichMapFunction<Long, Long> {
-		
+
+	private static class FailExceptInPartitionZeroMapper extends RichMapFunction<Long, Long> {
+
 		@Override
 		public Long map(Long value) throws Exception {
 			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
@@ -56,15 +60,15 @@ public class CustomPartitioningITCase extends JavaProgramTestBase {
 			}
 		}
 	}
-	
-	public static class AllZeroPartitioner implements Partitioner<Long> {
+
+	private static class AllZeroPartitioner implements Partitioner<Long> {
 		@Override
 		public int partition(Long key, int numPartitions) {
 			return 0;
 		}
 	}
-	
-	public static class IdKeySelector<T> implements KeySelector<T, T> {
+
+	private static class IdKeySelector<T> implements KeySelector<T, T> {
 		@Override
 		public T getKey(T value) {
 			return value;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index 76480ba..1532741 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.types.Value;
-
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -41,11 +41,15 @@ import java.io.IOException;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Test for proper error messages in case user-defined serialization is broken
+ * and detected in the network stack.
+ */
 @SuppressWarnings("serial")
 public class CustomSerializationITCase extends TestLogger {
 
 	private static final int PARLLELISM = 5;
-	
+
 	private static LocalFlinkMiniCluster cluster;
 
 	private static TestEnvironment env;
@@ -66,13 +70,13 @@ public class CustomSerializationITCase extends TestLogger {
 		cluster.shutdown();
 		cluster = null;
 	}
-	
+
 	@Test
 	public void testIncorrectSerializer1() {
 		try {
 			env.setParallelism(PARLLELISM);
 			env.getConfig().disableSysoutLogging();
-			
+
 			env
 				.generateSequence(1, 10 * PARLLELISM)
 				.map(new MapFunction<Long, ConsumesTooMuch>() {
@@ -83,7 +87,7 @@ public class CustomSerializationITCase extends TestLogger {
 				})
 				.rebalance()
 				.output(new DiscardingOutputFormat<ConsumesTooMuch>());
-			
+
 			env.execute();
 		}
 		catch (JobExecutionException e) {
@@ -186,11 +190,14 @@ public class CustomSerializationITCase extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Custom Data Types with broken Serialization Logic
 	// ------------------------------------------------------------------------
-	
+
+	/**
+	 * {@link Value} reading more data than written.
+	 */
 	public static class ConsumesTooMuch implements Value {
 
 		@Override
@@ -206,6 +213,9 @@ public class CustomSerializationITCase extends TestLogger {
 		}
 	}
 
+	/**
+	 * {@link Value} reading more buffers than written.
+	 */
 	public static class ConsumesTooMuchSpanning implements Value {
 
 		@Override
@@ -221,6 +231,9 @@ public class CustomSerializationITCase extends TestLogger {
 		}
 	}
 
+	/**
+	 * {@link Value} reading less data than written.
+	 */
 	public static class ConsumesTooLittle implements Value {
 
 		@Override
@@ -236,6 +249,9 @@ public class CustomSerializationITCase extends TestLogger {
 		}
 	}
 
+	/**
+	 * {@link Value} reading fewer buffers than written.
+	 */
 	public static class ConsumesTooLittleSpanning implements Value {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
index fa1fcb6..c004759 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
@@ -21,20 +21,23 @@ package org.apache.flink.test.misc;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+/**
+ * Test TypeInfo serializer tree.
+ */
 public class GenericTypeInfoTest {
 
 	@Test
 	public void testSerializerTree() {
 		@SuppressWarnings("unchecked")
-		TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti = 
-				(TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>) 
+		TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti =
+				(TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>)
 						TypeExtractor.createTypeInfo(CollectionDataSets.PojoWithCollectionGeneric.class);
-		
+
 		String serTree = Utils.getSerializerTree(ti);
 		// We can not test against the entire output because the fields of 'String' differ
 		// between java versions
@@ -67,7 +70,7 @@ public class GenericTypeInfoTest {
 				"            lowestSetBit:int\n" +
 				"            firstNonzeroIntNum:int\n" +
 				"    mixed:java.util.List\n" +
-				"    makeMeGeneric:org.apache.flink.test.javaApiOperators.util.CollectionDataSets$PojoWithDateAndEnum\n" +
+				"    makeMeGeneric:org.apache.flink.test.operators.util.CollectionDataSets$PojoWithDateAndEnum\n" +
 				"        group:java.lang.String\n"));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 7dab0f1..00b4485 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -30,24 +30,26 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
-
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the system behavior in multiple corner cases
  *   - when null records are passed through the system.
  *   - when disjoint dataflows are executed
  *   - when accumulators are used chained after a non-udf operator.
- *   
- * The tests are bundled into one class to reuse the same test cluster. This speeds
+ *
+ * <p>The tests are bundled into one class to reuse the same test cluster. This speeds
  * up test execution, as the majority of the test time goes usually into starting/stopping the
  * test cluster.
  */
@@ -59,7 +61,7 @@ public class MiscellaneousIssuesITCase extends TestLogger {
 	private static LocalFlinkMiniCluster cluster;
 
 	private static TestEnvironment env;
-	
+
 	@BeforeClass
 	public static void startCluster() {
 		Configuration config = new Configuration();
@@ -72,13 +74,13 @@ public class MiscellaneousIssuesITCase extends TestLogger {
 
 		env = new TestEnvironment(cluster, PARALLELISM, false);
 	}
-	
+
 	@AfterClass
 	public static void shutdownCluster() {
 		cluster.shutdown();
 		cluster = null;
 	}
-	
+
 	@Test
 	public void testNullValues() {
 		try {
@@ -128,13 +130,13 @@ public class MiscellaneousIssuesITCase extends TestLogger {
 
 	@Test
 	public void testAccumulatorsAfterNoOp() {
-		
-		final String ACC_NAME = "test_accumulator";
-		
+
+		final String accName = "test_accumulator";
+
 		try {
 			env.setParallelism(6);
 			env.getConfig().disableSysoutLogging();
-			
+
 			env.generateSequence(1, 1000000)
 					.rebalance()
 					.flatMap(new RichFlatMapFunction<Long, Long>() {
@@ -143,7 +145,7 @@ public class MiscellaneousIssuesITCase extends TestLogger {
 
 						@Override
 						public void open(Configuration parameters) {
-							counter = getRuntimeContext().getLongCounter(ACC_NAME);
+							counter = getRuntimeContext().getLongCounter(accName);
 						}
 
 						@Override
@@ -154,8 +156,8 @@ public class MiscellaneousIssuesITCase extends TestLogger {
 					.output(new DiscardingOutputFormat<Long>());
 
 			JobExecutionResult result = env.execute();
-			
-			assertEquals(1000000L, result.getAllAccumulatorResults().get(ACC_NAME));
+
+			assertEquals(1000000L, result.getAllAccumulatorResults().get(accName));
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index a5103cc..fd556d5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -32,35 +32,41 @@ import org.apache.flink.examples.java.clustering.KMeans;
 import org.apache.flink.examples.java.clustering.util.KMeansData;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+/**
+ * Test that runs an iterative job after a failure in another iterative job.
+ * This test validates that task slots in co-location constraints are properly
+ * freed in the presence of failures.
+ */
 public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 
 	private static final int PARALLELISM = 16;
 	@Test
 	public void testSuccessfulProgramAfterFailure() {
 		LocalFlinkMiniCluster cluster = null;
-		
+
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
-			
+
 			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 
 			TestEnvironment env = new TestEnvironment(cluster, PARALLELISM, false);
-			
+
 			try {
 				runConnectedComponents(env);
 			}
@@ -68,7 +74,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 				e.printStackTrace();
 				fail("Program Execution should have succeeded.");
 			}
-	
+
 			try {
 				runKMeans(env);
 				fail("This program execution should have failed.");
@@ -76,7 +82,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 			catch (JobExecutionException e) {
 				assertTrue(e.getCause().getMessage().contains("Insufficient number of network buffers"));
 			}
-	
+
 			try {
 				runConnectedComponents(env);
 			}
@@ -95,9 +101,9 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 			}
 		}
 	}
-	
+
 	private static void runConnectedComponents(ExecutionEnvironment env) throws Exception {
-		
+
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -166,7 +172,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 				.map(new KMeans.SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
 
 		clusteredPoints.output(new DiscardingOutputFormat<Tuple2<Integer, KMeans.Point>>());
-		
+
 		env.execute("KMeans Example");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java
new file mode 100644
index 0000000..b4bd213
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.ValueCollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Integration tests for aggregations.
+ */
+@RunWith(Parameterized.class)
+public class AggregateITCase extends MultipleProgramsTestBase {
+
+	public AggregateITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testFullAggregate() throws Exception {
+		/*
+		 * Full Aggregate
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<Integer, Long>> aggregateDs = ds
+				.aggregate(Aggregations.SUM, 0)
+				.and(Aggregations.MAX, 1)
+				.project(0, 1);
+
+		List<Tuple2<Integer, Long>> result = aggregateDs.collect();
+
+		String expected = "231,6\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testFullAggregateOfMutableValueTypes() throws Exception {
+		/*
+		 * Full Aggregate of mutable value types
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds
+				.aggregate(Aggregations.SUM, 0)
+				.and(Aggregations.MAX, 1)
+				.project(0, 1);
+
+		List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
+
+		String expected = "231,6\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testGroupedAggregate() throws Exception {
+		/*
+		 * Grouped Aggregate
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
+				.aggregate(Aggregations.SUM, 0)
+				.project(1, 0);
+
+		List<Tuple2<Long, Integer>> result = aggregateDs.collect();
+
+		String expected = "1,1\n" +
+				"2,5\n" +
+				"3,15\n" +
+				"4,34\n" +
+				"5,65\n" +
+				"6,111\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testGroupedAggregateOfMutableValueTypes() throws Exception {
+		/*
+		 * Grouped Aggregate of mutable value types
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds.groupBy(1)
+				.aggregate(Aggregations.SUM, 0)
+				.project(1, 0);
+
+		List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
+
+		String expected = "1,1\n" +
+				"2,5\n" +
+				"3,15\n" +
+				"4,34\n" +
+				"5,65\n" +
+				"6,111\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testNestedAggregate() throws Exception {
+		/*
+		 * Nested Aggregate
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
+				.aggregate(Aggregations.MIN, 0)
+				.aggregate(Aggregations.MIN, 0)
+				.project(0);
+
+		List<Tuple1<Integer>> result = aggregateDs.collect();
+
+		String expected = "1\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testNestedAggregateOfMutableValueTypes() throws Exception {
+		/*
+		 * Nested Aggregate of mutable value types
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple1<IntValue>> aggregateDs = ds.groupBy(1)
+				.aggregate(Aggregations.MIN, 0)
+				.aggregate(Aggregations.MIN, 0)
+				.project(0);
+
+		List<Tuple1<IntValue>> result = aggregateDs.collect();
+
+		String expected = "1\n";
+
+		compareResultAsTuples(result, expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
new file mode 100644
index 0000000..4108b24
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+
+/**
+ * Integration tests for {@link CoGroupFunction}.
+ */
+@SuppressWarnings({"serial", "unchecked"})
+public class CoGroupGroupSortITCase extends JavaProgramTestBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Long, Long>> input1 = env.fromElements(
+				new Tuple2<Long, Long>(0L, 5L),
+				new Tuple2<Long, Long>(0L, 4L),
+				new Tuple2<Long, Long>(0L, 3L),
+				new Tuple2<Long, Long>(0L, 2L),
+				new Tuple2<Long, Long>(0L, 1L),
+				new Tuple2<Long, Long>(1L, 10L),
+				new Tuple2<Long, Long>(1L, 8L),
+				new Tuple2<Long, Long>(1L, 9L),
+				new Tuple2<Long, Long>(1L, 7L));
+
+		DataSet<TestPojo> input2 = env.fromElements(
+				new TestPojo(0L, 10L, 3L),
+				new TestPojo(0L, 8L, 3L),
+				new TestPojo(0L, 10L, 1L),
+				new TestPojo(0L, 9L, 0L),
+				new TestPojo(0L, 8L, 2L),
+				new TestPojo(0L, 8L, 4L),
+				new TestPojo(1L, 10L, 3L),
+				new TestPojo(1L, 8L, 3L),
+				new TestPojo(1L, 10L, 1L),
+				new TestPojo(1L, 9L, 0L),
+				new TestPojo(1L, 8L, 2L),
+				new TestPojo(1L, 8L, 4L));
+
+		input1.coGroup(input2)
+		.where(1).equalTo("b")
+		.sortFirstGroup(0, Order.DESCENDING)
+		.sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
+
+		.with(new ValidatingCoGroup())
+		.output(new DiscardingOutputFormat<NullValue>());
+
+		env.execute();
+	}
+
+	private static class ValidatingCoGroup implements CoGroupFunction<Tuple2<Long, Long>, TestPojo, NullValue> {
+
+		@Override
+		public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPojo> second, Collector<NullValue> out) throws Exception {
+			// validate the tuple input, field 1, descending
+			{
+				long lastValue = Long.MAX_VALUE;
+
+				for (Tuple2<Long, Long> t : first) {
+					long current = t.f1;
+					Assert.assertTrue(current <= lastValue);
+					lastValue = current;
+				}
+			}
+
+			// validate the pojo input
+			{
+				TestPojo lastValue = new TestPojo(Long.MAX_VALUE, 0, Long.MIN_VALUE);
+
+				for (TestPojo current : second) {
+					Assert.assertTrue(current.c >= lastValue.c);
+					Assert.assertTrue(current.c != lastValue.c || current.a <= lastValue.a);
+
+					lastValue = current;
+				}
+			}
+
+		}
+	}
+
+	/**
+	 * Test POJO.
+	 */
+	public static class TestPojo implements Cloneable {
+		public long a;
+		public long b;
+		public long c;
+
+		public TestPojo() {}
+
+		public TestPojo(long a, long b, long c) {
+			this.a = a;
+			this.b = b;
+			this.c = c;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
new file mode 100644
index 0000000..453f525
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -0,0 +1,989 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.operators.util.CollectionDataSets.POJO;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Integration tests for {@link CoGroupFunction} and {@link RichCoGroupFunction}.
+ */
+@RunWith(Parameterized.class)
+public class CoGroupITCase extends MultipleProgramsTestBase {
+
+	public CoGroupITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	/*
+	 * CoGroup on tuples with key field selector
+	 */
+	@Test
+	public void testCoGroupTuplesWithKeyFieldSelector() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple2<Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup());
+
+		List<Tuple2<Integer, Integer>> result = coGroupDs.collect();
+
+		String expected = "1,0\n" +
+				"2,6\n" +
+				"3,24\n" +
+				"4,60\n" +
+				"5,120\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
+		/*
+		 * CoGroup on two custom type inputs with key extractors
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector4()).equalTo(new
+				KeySelector5()).with(new CustomTypeCoGroup());
+
+		List<CustomType> result = coGroupDs.collect();
+
+		String expected = "1,0,test\n" +
+				"2,6,test\n" +
+				"3,24,test\n" +
+				"4,60,test\n" +
+				"5,120,test\n" +
+				"6,210,test\n";
+
+		compareResultAsText(result, expected);
+	}
+
+	private static class KeySelector4 implements KeySelector<CustomType, Integer> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Integer getKey(CustomType in) {
+			return in.myInt;
+		}
+	}
+
+	private static class KeySelector5 implements KeySelector<CustomType, Integer> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Integer getKey(CustomType in) {
+			return in.myInt;
+		}
+	}
+
+	@Test
+	public void testCorrectnessOfCoGroupIfUDFReturnsLeftInputObjects() throws Exception {
+		/*
+		 * check correctness of cogroup if UDF returns left input objects
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft());
+
+		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
+
+		String expected = "1,1,Hi\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n" +
+				"4,3,Hello world, how are you?\n" +
+				"5,3,I am fine.\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() throws Exception {
+		/*
+		 * check correctness of cogroup if UDF returns right input objects
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight());
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> result = coGroupDs.collect();
+
+		String expected = "1,1,0,Hallo,1\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"2,3,2,Hallo Welt wie,1\n" +
+				"3,4,3,Hallo Welt wie gehts?,2\n" +
+				"3,5,4,ABC,2\n" +
+				"3,6,5,BCD,3\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCoGroupWithBroadcastSet() throws Exception {
+		/*
+		 * Reduce with broadcast set
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
+
+		List<Tuple3<Integer, Integer, Integer>> result = coGroupDs.collect();
+
+		String expected = "1,0,55\n" +
+				"2,6,55\n" +
+				"3,24,55\n" +
+				"4,60,55\n" +
+				"5,120,55\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCoGroupOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor()
+			throws Exception {
+		/*
+		 * CoGroup on a tuple input with key field selector and a custom type input with key extractor
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(2).equalTo(new
+				KeySelector2()).with(new MixedCoGroup());
+
+		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
+
+		String expected = "0,1,test\n" +
+				"1,2,test\n" +
+				"2,5,test\n" +
+				"3,15,test\n" +
+				"4,33,test\n" +
+				"5,63,test\n" +
+				"6,109,test\n" +
+				"7,4,test\n" +
+				"8,4,test\n" +
+				"9,4,test\n" +
+				"10,5,test\n" +
+				"11,5,test\n" +
+				"12,5,test\n" +
+				"13,5,test\n" +
+				"14,5,test\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	private static class KeySelector2 implements KeySelector<CustomType, Integer> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Integer getKey(CustomType in) {
+			return in.myInt;
+		}
+	}
+
+	@Test
+	public void testCoGroupOnACustomTypeWithKeyExtractorAndATupleInputWithKeyFieldSelector()
+			throws Exception {
+		/*
+		 * CoGroup on a tuple input with key field selector and a custom type input with key extractor
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new KeySelector3()).equalTo(2).with
+				(new MixedCoGroup2());
+
+		List<CustomType> result = coGroupDs.collect();
+
+		String expected = "0,1,test\n" +
+				"1,2,test\n" +
+				"2,5,test\n" +
+				"3,15,test\n" +
+				"4,33,test\n" +
+				"5,63,test\n" +
+				"6,109,test\n" +
+				"7,4,test\n" +
+				"8,4,test\n" +
+				"9,4,test\n" +
+				"10,5,test\n" +
+				"11,5,test\n" +
+				"12,5,test\n" +
+				"13,5,test\n" +
+				"14,5,test\n";
+
+		compareResultAsText(result, expected);
+	}
+
+	private static class KeySelector3 implements KeySelector<CustomType, Integer> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Integer getKey(CustomType in) {
+			return in.myInt;
+		}
+	}
+
+	@Test
+	public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception {
+		/*
+		 * CoGroup with multiple key fields
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+				where(0, 4).equalTo(0, 1).with(new Tuple5Tuple3CoGroup());
+
+		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+		String expected = "1,1,Hallo\n" +
+				"2,2,Hallo Welt\n" +
+				"3,2,Hallo Welt wie gehts?\n" +
+				"3,2,ABC\n" +
+				"5,3,HIJ\n" +
+				"5,3,IJK\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCoGroupWithMultipleKeyFieldsWithStaticClassKeyExtractor() throws Exception {
+		/*
+		 * CoGroup with multiple key fields
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+				where(new KeySelector7()).
+				equalTo(new KeySelector8()).with(new Tuple5Tuple3CoGroup());
+
+		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+		String expected = "1,1,Hallo\n" +
+				"2,2,Hallo Welt\n" +
+				"3,2,Hallo Welt wie gehts?\n" +
+				"3,2,ABC\n" +
+				"5,3,HIJ\n" +
+				"5,3,IJK\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() throws Exception {
+		/*
+		 * CoGroup with multiple key fields, test working closure cleaner for inner classes
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+				where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
+						Tuple2<Integer, Long>>() {
+					@Override
+					public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
+						return new Tuple2<Integer, Long>(t.f0, t.f4);
+					}
+				}).
+				equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() {
+
+					@Override
+					public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
+						return new Tuple2<>(t.f0, t.f1);
+					}
+				}).
+				with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
+					@Override
+					public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+							Iterable<Tuple3<Integer, Long, String>> second,
+							Collector<Tuple3<Integer, Long, String>> out) {
+						List<String> strs = new ArrayList<>();
+
+						for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+							strs.add(t.f3);
+						}
+
+						for (Tuple3<Integer, Long, String> t : second) {
+							for (String s : strs) {
+								out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+							}
+						}
+					}
+				});
+
+		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+		String expected = "1,1,Hallo\n" +
+				"2,2,Hallo Welt\n" +
+				"3,2,Hallo Welt wie gehts?\n" +
+				"3,2,ABC\n" +
+				"5,3,HIJ\n" +
+				"5,3,IJK\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithoutClosureCleaner() throws Exception {
+		/*
+		 * CoGroup with multiple key fields, test that disabling closure cleaner leads to an exception when using inner
+		 * classes.
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableClosureCleaner();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+		boolean correctExceptionTriggered = false;
+		try {
+			DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+					where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
+							Tuple2<Integer, Long>>() {
+						@Override
+						public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
+							return new Tuple2<Integer, Long>(t.f0, t.f4);
+						}
+					}).
+					equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() {
+
+						@Override
+						public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
+							return new Tuple2<Integer, Long>(t.f0, t.f1);
+						}
+					}).
+					with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
+						@Override
+						public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+									Iterable<Tuple3<Integer, Long, String>> second,
+									Collector<Tuple3<Integer, Long, String>> out) {
+							List<String> strs = new ArrayList<String>();
+
+							for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+								strs.add(t.f3);
+							}
+
+							for (Tuple3<Integer, Long, String> t : second) {
+								for (String s : strs) {
+									out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+								}
+							}
+						}
+					});
+		} catch (InvalidProgramException ex) {
+			correctExceptionTriggered = (ex.getCause() instanceof java.io.NotSerializableException);
+		}
+		Assert.assertTrue(correctExceptionTriggered);
+
+	}
+
+	private static class KeySelector7 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
+	Tuple2<Integer, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
+			return new Tuple2<Integer, Long>(t.f0, t.f4);
+		}
+	}
+
+	private static class KeySelector8 implements KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
+			return new Tuple2<Integer, Long>(t.f0, t.f1);
+		}
+	}
+
+	@Test
+	public void testCoGroupTwoCustomTypeInputsWithExpressionKeys() throws Exception {
+		/*
+		 * CoGroup on two custom type inputs using expression keys
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup());
+
+		List<CustomType> result = coGroupDs.collect();
+
+		String expected = "1,0,test\n" +
+				"2,6,test\n" +
+				"3,24,test\n" +
+				"4,60,test\n" +
+				"5,120,test\n" +
+				"6,210,test\n";
+
+		compareResultAsText(result, expected);
+	}
+
+	@Test
+	public void testCoGroupOnTwoCustomTypeInputsWithExpressionKeyAndFieldSelector() throws
+			Exception {
+		/*
+		 * CoGroup on two custom type inputs using expression keys
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+				.where("nestedPojo.longNumber").equalTo(6).with(new CoGroup1());
+
+		List<CustomType> result = coGroupDs.collect();
+
+		String expected = 	"-1,20000,Flink\n" +
+				"-1,10000,Flink\n" +
+				"-1,30000,Flink\n";
+
+		compareResultAsText(result, expected);
+	}
+
+	private static class CoGroup1 implements CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(
+				Iterable<POJO> first,
+				Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+				Collector<CustomType> out) throws Exception {
+			for (POJO p : first) {
+				for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+					Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+					out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+				}
+			}
+		}
+	}
+
+	@Test
+	public void testCoGroupFieldSelectorAndComplicatedKeySelector() throws Exception {
+		/*
+		 * CoGroup field-selector (expression keys) + key selector function
+		 * The key selector is unnecessary complicated (Tuple1) ;)
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+				.where(new KeySelector6()).equalTo(6).with(new CoGroup3());
+
+		List<CustomType> result = coGroupDs.collect();
+
+		String expected = 	"-1,20000,Flink\n" +
+				"-1,10000,Flink\n" +
+				"-1,30000,Flink\n";
+
+		compareResultAsText(result, expected);
+	}
+
+	private static class KeySelector6 implements KeySelector<POJO, Tuple1<Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple1<Long> getKey(POJO value)
+		throws Exception {
+			return new Tuple1<Long>(value.nestedPojo.longNumber);
+		}
+	}
+
+	private static class CoGroup3 implements CoGroupFunction<POJO, Tuple7<Integer,
+			String, Integer, Integer, Long, String, Long>, CustomType> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(
+				Iterable<POJO> first,
+				Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+				Collector<CustomType> out) throws Exception {
+			for (POJO p : first) {
+				for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+					Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+					out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+				}
+			}
+		}
+	}
+
+	@Test
+	public void testCoGroupFieldSelectorAndKeySelector() throws Exception {
+		/*
+		 * CoGroup field-selector (expression keys) + key selector function
+		 * The key selector is simple here
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+				.where(new KeySelector1()).equalTo(6).with(new CoGroup2());
+
+		List<CustomType> result = coGroupDs.collect();
+
+		String expected = "-1,20000,Flink\n" +
+				"-1,10000,Flink\n" +
+				"-1,30000,Flink\n";
+
+		compareResultAsText(result, expected);
+	}
+
+	@Test
+	public void testCoGroupWithAtomicType1() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Integer> ds2 = env.fromElements(0, 1, 2);
+
+		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*").with(new CoGroupAtomic1());
+
+		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
+
+		String expected = "(1,1,Hi)\n" +
+			"(2,2,Hello)";
+
+		compareResultAsText(result, expected);
+	}
+
+	@Test
+	public void testCoGroupWithAtomicType2() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds1 = env.fromElements(0, 1, 2);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0).with(new CoGroupAtomic2());
+
+		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
+
+		String expected = "(1,1,Hi)\n" +
+			"(2,2,Hello)";
+
+		compareResultAsText(result, expected);
+	}
+
+	@Test
+	public void testCoGroupWithRangePartitioning() throws Exception {
+		/*
+		 * Test coGroup on tuples with multiple key field positions and same customized distribution
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+		env.setParallelism(4);
+		TestDistribution testDis = new TestDistribution();
+		DataSet<Tuple3<Integer, Long, String>> coGrouped =
+				DataSetUtils.partitionByRange(ds1, testDis, 0, 4)
+						.coGroup(DataSetUtils.partitionByRange(ds2, testDis, 0, 1))
+						.where(0, 4)
+						.equalTo(0, 1)
+						.with(new Tuple5Tuple3CoGroup());
+
+		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+		String expected = "1,1,Hallo\n" +
+				"2,2,Hallo Welt\n" +
+				"3,2,Hallo Welt wie gehts?\n" +
+				"3,2,ABC\n" +
+				"5,3,HIJ\n" +
+				"5,3,IJK\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  UDF classes
+	// --------------------------------------------------------------------------------------------
+
+	private static class KeySelector1 implements KeySelector<POJO, Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Long getKey(POJO value)
+		throws Exception {
+			return value.nestedPojo.longNumber;
+		}
+	}
+
+	private static class CoGroup2 implements CoGroupFunction<POJO, Tuple7<Integer, String,
+			Integer, Integer, Long, String, Long>, CustomType> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(
+				Iterable<POJO> first,
+				Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+				Collector<CustomType> out) throws Exception {
+			for (POJO p : first) {
+				for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+					Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+					out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+				}
+			}
+		}
+	}
+
+	private static class Tuple5CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+				Collector<Tuple2<Integer, Integer>> out) {
+			int sum = 0;
+			int id = 0;
+
+			for (Tuple5<Integer, Long, Integer, String, Long> element : first) {
+				sum += element.f2;
+				id = element.f0;
+			}
+
+			for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
+				sum += element.f2;
+				id = element.f0;
+			}
+
+			out.collect(new Tuple2<Integer, Integer>(id, sum));
+		}
+	}
+
+	private static class CustomTypeCoGroup implements CoGroupFunction<CustomType, CustomType, CustomType> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(Iterable<CustomType> first, Iterable<CustomType> second, Collector<CustomType> out) {
+
+			CustomType o = new CustomType(0, 0, "test");
+
+			for (CustomType element : first) {
+				o.myInt = element.myInt;
+				o.myLong += element.myLong;
+			}
+
+			for (CustomType element : second) {
+				o.myInt = element.myInt;
+				o.myLong += element.myLong;
+			}
+
+			out.collect(o);
+		}
+	}
+
+	private static class MixedCoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+				Iterable<CustomType> second,
+				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+
+			long sum = 0;
+			int id = 0;
+
+			for (Tuple5<Integer, Long, Integer, String, Long> element : first) {
+				sum += element.f0;
+				id = element.f2;
+			}
+
+			for (CustomType element : second) {
+				id = element.myInt;
+				sum += element.myLong;
+			}
+
+			out.collect(new Tuple3<Integer, Long, String>(id, sum, "test"));
+		}
+
+	}
+
+	private static class MixedCoGroup2 implements CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, CustomType> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(Iterable<CustomType> first,
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+				Collector<CustomType> out) {
+			CustomType o = new CustomType(0, 0, "test");
+
+			for (CustomType element : first) {
+				o.myInt = element.myInt;
+				o.myLong += element.myLong;
+			}
+
+			for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
+				o.myInt = element.f2;
+				o.myLong += element.f0;
+			}
+
+			out.collect(o);
+
+		}
+
+	}
+
+	private static class Tuple3ReturnLeft implements CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(Iterable<Tuple3<Integer, Long, String>> first,
+				Iterable<Tuple3<Integer, Long, String>> second,
+				Collector<Tuple3<Integer, Long, String>> out) {
+			for (Tuple3<Integer, Long, String> element : first) {
+				if (element.f0 < 6) {
+					out.collect(element);
+				}
+			}
+		}
+	}
+
+	private static class Tuple5ReturnRight implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+				Collector<Tuple5<Integer, Long, Integer, String, Long>> out) {
+			for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
+				if (element.f0 < 4) {
+					out.collect(element);
+				}
+			}
+		}
+	}
+
+	private static class Tuple5CoGroupBC extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private int broadcast = 42;
+
+		@Override
+		public void open(Configuration config) {
+
+			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+			int sum = 0;
+			for (Integer i : ints) {
+				sum += i;
+			}
+			broadcast = sum;
+
+		}
+
+		@Override
+		public void coGroup(
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+				Collector<Tuple3<Integer, Integer, Integer>> out) {
+			int sum = 0;
+			int id = 0;
+
+			for (Tuple5<Integer, Long, Integer, String, Long> element : first) {
+				sum += element.f2;
+				id = element.f0;
+			}
+
+			for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
+				sum += element.f2;
+				id = element.f0;
+			}
+
+			out.collect(new Tuple3<Integer, Integer, Integer>(id, sum, broadcast));
+		}
+	}
+
+	private static class Tuple5Tuple3CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+				Iterable<Tuple3<Integer, Long, String>> second,
+				Collector<Tuple3<Integer, Long, String>> out) {
+			List<String> strs = new ArrayList<String>();
+
+			for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+				strs.add(t.f3);
+			}
+
+			for (Tuple3<Integer, Long, String> t : second) {
+				for (String s : strs) {
+					out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+				}
+			}
+		}
+	}
+
+	private static class CoGroupAtomic1 implements CoGroupFunction<Tuple3<Integer, Long, String>, Integer, Tuple3<Integer, Long, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(Iterable<Tuple3<Integer, Long, String>> first, Iterable<Integer> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+			List<Integer> ints = new ArrayList<Integer>();
+
+			for (Integer i : second) {
+				ints.add(i);
+			}
+
+			for (Tuple3<Integer, Long, String> t : first) {
+				for (Integer i : ints) {
+					if (t.f0.equals(i)) {
+						out.collect(t);
+					}
+				}
+			}
+		}
+	}
+
+	private static class CoGroupAtomic2 implements CoGroupFunction<Integer, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(Iterable<Integer> first, Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+			List<Integer> ints = new ArrayList<Integer>();
+
+			for (Integer i : first) {
+				ints.add(i);
+			}
+
+			for (Tuple3<Integer, Long, String> t : second) {
+				for (Integer i : ints) {
+					if (t.f0.equals(i)) {
+						out.collect(t);
+					}
+				}
+			}
+		}
+	}
+
+	/**
+	 * Test {@link DataDistribution}.
+	 */
+	public static class TestDistribution implements DataDistribution {
+		public Object[][] boundaries = new Object[][]{
+				new Object[]{2, 2L},
+				new Object[]{5, 4L},
+				new Object[]{10, 12L},
+				new Object[]{21, 6L}
+		};
+
+		public TestDistribution() {}
+
+		@Override
+		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+			return boundaries[bucketNum];
+		}
+
+		@Override
+		public int getNumberOfFields() {
+			return 2;
+		}
+
+		@Override
+		public TypeInformation[] getKeyTypes() {
+			return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof TestDistribution;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
new file mode 100644
index 0000000..6e61f60
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.RichCrossFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Integration tests for {@link CrossFunction} and {@link RichCrossFunction}.
+ */
+@RunWith(Parameterized.class)
+public class CrossITCase extends MultipleProgramsTestBase {
+
+	public CrossITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	@Test
+	public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception {
+		/*
+		 * check correctness of cross on two tuple inputs
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple2<Integer, String>> crossDs = ds.cross(ds2).with(new Tuple5Cross());
+
+		List<Tuple2<Integer, String>> result = crossDs.collect();
+
+		String expected = "0,HalloHallo\n" +
+				"1,HalloHallo Welt\n" +
+				"2,HalloHallo Welt wie\n" +
+				"1,Hallo WeltHallo\n" +
+				"2,Hallo WeltHallo Welt\n" +
+				"3,Hallo WeltHallo Welt wie\n" +
+				"2,Hallo Welt wieHallo\n" +
+				"3,Hallo Welt wieHallo Welt\n" +
+				"4,Hallo Welt wieHallo Welt wie\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCorrectnessOfCrossIfUDFReturnsLeftInputObject() throws Exception {
+		/*
+		 * check correctness of cross if UDF returns left input object
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new Tuple3ReturnLeft());
+
+		List<Tuple3<Integer, Long, String>> result = crossDs.collect();
+
+		String expected = "1,1,Hi\n" +
+				"1,1,Hi\n" +
+				"1,1,Hi\n" +
+				"2,2,Hello\n" +
+				"2,2,Hello\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n" +
+				"3,2,Hello world\n" +
+				"3,2,Hello world\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCorrectnessOfCrossIfUDFReturnsRightInputObject() throws Exception {
+		/*
+		 * check correctness of cross if UDF returns right input object
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight());
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> result = crossDs
+				.collect();
+
+		String expected = "1,1,0,Hallo,1\n" +
+				"1,1,0,Hallo,1\n" +
+				"1,1,0,Hallo,1\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"2,3,2,Hallo Welt wie,1\n" +
+				"2,3,2,Hallo Welt wie,1\n" +
+				"2,3,2,Hallo Welt wie,1\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCorrectnessOfCrossWithBroadcastSet() throws Exception {
+		/*
+		 * check correctness of cross with broadcast set
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Integer, Integer>> crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints");
+
+		List<Tuple3<Integer, Integer, Integer>> result = crossDs.collect();
+
+		String expected = "2,0,55\n" +
+				"3,0,55\n" +
+				"3,0,55\n" +
+				"3,0,55\n" +
+				"4,1,55\n" +
+				"4,2,55\n" +
+				"3,0,55\n" +
+				"4,2,55\n" +
+				"4,4,55\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCorrectnessOfCrossWithHuge() throws Exception {
+		/*
+		 * check correctness of crossWithHuge (only correctness of result -> should be the same as with normal cross)
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithHuge(ds2).with(new Tuple5Cross());
+
+		List<Tuple2<Integer, String>> result = crossDs.collect();
+
+		String expected = "0,HalloHallo\n" +
+				"1,HalloHallo Welt\n" +
+				"2,HalloHallo Welt wie\n" +
+				"1,Hallo WeltHallo\n" +
+				"2,Hallo WeltHallo Welt\n" +
+				"3,Hallo WeltHallo Welt wie\n" +
+				"2,Hallo Welt wieHallo\n" +
+				"3,Hallo Welt wieHallo Welt\n" +
+				"4,Hallo Welt wieHallo Welt wie\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCorrectnessOfCrossWithTiny() throws Exception {
+		/*
+		 * check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross)
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross());
+
+		List<Tuple2<Integer, String>> result = crossDs.collect();
+
+		String expected = "0,HalloHallo\n" +
+				"1,HalloHallo Welt\n" +
+				"2,HalloHallo Welt wie\n" +
+				"1,Hallo WeltHallo\n" +
+				"2,Hallo WeltHallo Welt\n" +
+				"3,Hallo WeltHallo Welt wie\n" +
+				"2,Hallo Welt wieHallo\n" +
+				"3,Hallo Welt wieHallo Welt\n" +
+				"4,Hallo Welt wieHallo Welt wie\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testProjectCrossOnATupleInput1() throws Exception{
+		/*
+		 * project cross on a tuple input 1
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple6<String, Long, String, Integer, Long, Long>> crossDs = ds.cross(ds2)
+				.projectFirst(2, 1)
+				.projectSecond(3)
+				.projectFirst(0)
+				.projectSecond(4, 1);
+
+		List<Tuple6<String, Long, String, Integer, Long, Long>> result = crossDs.collect();
+
+		String expected = "Hi,1,Hallo,1,1,1\n" +
+				"Hi,1,Hallo Welt,1,2,2\n" +
+				"Hi,1,Hallo Welt wie,1,1,3\n" +
+				"Hello,2,Hallo,2,1,1\n" +
+				"Hello,2,Hallo Welt,2,2,2\n" +
+				"Hello,2,Hallo Welt wie,2,1,3\n" +
+				"Hello world,2,Hallo,3,1,1\n" +
+				"Hello world,2,Hallo Welt,3,2,2\n" +
+				"Hello world,2,Hallo Welt wie,3,1,3\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testProjectCrossOnATupleInput2() throws Exception {
+		/*
+		 * project cross on a tuple input 2
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple6<String, String, Long, Long, Long, Integer>> crossDs = ds.cross(ds2)
+				.projectSecond(3)
+				.projectFirst(2, 1)
+				.projectSecond(4, 1)
+				.projectFirst(0);
+
+		List<Tuple6<String, String, Long, Long, Long, Integer>> result = crossDs.collect();
+
+		String expected = "Hallo,Hi,1,1,1,1\n" +
+				"Hallo Welt,Hi,1,2,2,1\n" +
+				"Hallo Welt wie,Hi,1,1,3,1\n" +
+				"Hallo,Hello,2,1,1,2\n" +
+				"Hallo Welt,Hello,2,2,2,2\n" +
+				"Hallo Welt wie,Hello,2,1,3,2\n" +
+				"Hallo,Hello world,2,1,1,3\n" +
+				"Hallo Welt,Hello world,2,2,2,3\n" +
+				"Hallo Welt wie,Hello world,2,1,3,3\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCorrectnessOfDefaultCross() throws Exception {
+		/*
+		 * check correctness of default cross
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2);
+
+		List<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> result = crossDs.collect();
+
+		String expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n"
+				+
+				"(1,1,Hi),(1,1,0,Hallo,1)\n" +
+				"(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" +
+				"(2,2,Hello),(2,2,1,Hallo Welt,2)\n" +
+				"(2,2,Hello),(1,1,0,Hallo,1)\n" +
+				"(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
+				"(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" +
+				"(3,2,Hello world),(1,1,0,Hallo,1)\n" +
+				"(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCorrectnessOfCrossOnTwoCustomTypeInputs() throws Exception {
+		/*
+		 * check correctness of cross on two custom type inputs
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getSmallCustomTypeDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+		DataSet<CustomType> crossDs = ds.cross(ds2).with(new CustomTypeCross());
+
+		List<CustomType> result = crossDs.collect();
+
+		String expected = "1,0,HiHi\n"
+				+ "2,1,HiHello\n"
+				+ "2,2,HiHello world\n"
+				+ "2,1,HelloHi\n"
+				+ "4,2,HelloHello\n"
+				+ "4,3,HelloHello world\n"
+				+ "2,2,Hello worldHi\n"
+				+ "4,3,Hello worldHello\n"
+				+ "4,4,Hello worldHello world";
+
+		compareResultAsText(result, expected);
+	}
+
+	@Test
+	public void testCorrectnessOfCrossATupleInputAndACustomTypeInput() throws Exception {
+		/*
+		 * check correctness of cross a tuple input and a custom type input
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new MixedCross());
+
+		List<Tuple3<Integer, Long, String>> result = crossDs.collect();
+
+		String expected = "2,0,HalloHi\n" +
+				"3,0,HalloHello\n" +
+				"3,0,HalloHello world\n" +
+				"3,0,Hallo WeltHi\n" +
+				"4,1,Hallo WeltHello\n" +
+				"4,2,Hallo WeltHello world\n" +
+				"3,0,Hallo Welt wieHi\n" +
+				"4,2,Hallo Welt wieHello\n" +
+				"4,4,Hallo Welt wieHello world\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	private static class Tuple5Cross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, String> cross(
+				Tuple5<Integer, Long, Integer, String, Long> first,
+				Tuple5<Integer, Long, Integer, String, Long> second)
+				throws Exception {
+
+				return new Tuple2<Integer, String>(first.f2 + second.f2, first.f3 + second.f3);
+		}
+
+	}
+
+	private static class CustomTypeCross implements CrossFunction<CustomType, CustomType, CustomType> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public CustomType cross(CustomType first, CustomType second)
+				throws Exception {
+
+			return new CustomType(first.myInt * second.myInt, first.myLong + second.myLong, first.myString + second.myString);
+		}
+
+	}
+
+	private static class MixedCross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple3<Integer, Long, String> cross(
+				Tuple5<Integer, Long, Integer, String, Long> first,
+				CustomType second) throws Exception {
+
+			return new Tuple3<Integer, Long, String>(first.f0 + second.myInt, first.f2 * second.myLong, first.f3 + second.myString);
+		}
+
+	}
+
+	private static class Tuple3ReturnLeft implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple3<Integer, Long, String> cross(
+				Tuple3<Integer, Long, String> first,
+				Tuple5<Integer, Long, Integer, String, Long> second) throws Exception {
+
+			return first;
+		}
+	}
+
+	private static class Tuple5ReturnRight implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple5<Integer, Long, Integer, String, Long> cross(
+				Tuple3<Integer, Long, String> first,
+				Tuple5<Integer, Long, Integer, String, Long> second)
+				throws Exception {
+
+			return second;
+		}
+
+	}
+
+	private static class Tuple5CrossBC extends RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private int broadcast = 42;
+
+		@Override
+		public void open(Configuration config) {
+
+			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+			int sum = 0;
+			for (Integer i : ints) {
+				sum += i;
+			}
+			broadcast = sum;
+
+		}
+
+		@Override
+		public Tuple3<Integer, Integer, Integer> cross(
+				Tuple5<Integer, Long, Integer, String, Long> first,
+				Tuple5<Integer, Long, Integer, String, Long> second)
+				throws Exception {
+
+			return new Tuple3<Integer, Integer, Integer>(first.f0 + second.f0, first.f2 * second.f2, broadcast);
+		}
+	}
+}